Spaces:
Configuration error
Configuration error
| # EMMA Rover Pipeline | |
| import os | |
| import csv | |
| import gc | |
| import cv2 | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import Dataset, DataLoader | |
| from ncps.torch import LTC | |
| from matplotlib.animation import FuncAnimation | |
| try: | |
| import psutil | |
| _HAS_PSUTIL = True | |
| except Exception: | |
| _HAS_PSUTIL = False | |
| try: | |
| from moviepy.editor import VideoFileClip | |
| except Exception: | |
| from moviepy import VideoFileClip | |
| import librosa | |
| # Set device for computation (GPU if available, otherwise CPU) | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| print(f"Using device: {device}") | |
| # Global variable to store the number of features per timestep | |
| Nloop = 0 | |
| def check_memory_usage(): | |
| """ | |
| Monitor system memory usage during processing. | |
| This function provides lightweight memory monitoring to help track resource | |
| consumption during video and audio processing. Useful for long-running | |
| pipeline operations to ensure system stability. | |
| Why: Prevent memory overflow during large video processing | |
| What: Display current memory usage statistics | |
| """ | |
| if not _HAS_PSUTIL: | |
| return | |
| mem = psutil.virtual_memory() | |
| used_gb = mem.used / (1024**3) | |
| total_gb = mem.total / (1024**3) | |
| print(f"[INFO] Memory usage: {used_gb:.1f}GB / {total_gb:.1f}GB ({mem.percent:.1f}%)") | |
| class DroneDetector: | |
| """ | |
| Rover detector using YOLO with improved tracking capabilities. | |
| This class implements the core object detection functionality for the rover pipeline. | |
| It uses YOLO (You Only Look Once) neural network for real-time rover detection | |
| in video frames with intelligent tracking and filtering mechanisms. | |
| Key Features: | |
| - YOLO-based object detection | |
| - Confidence-based filtering | |
| - Size-based detection validation | |
| - Edge proximity filtering | |
| - Tracking consistency (distance-based selection) | |
| Why: Accurate rover detection is critical for trajectory analysis | |
| What: Detects rover bounding boxes with confidence scores | |
| """ | |
| def __init__(self, weights_path, conf=0.15, imgsz=640): | |
| """ | |
| Initialize the rover detector with YOLO model. | |
| Args: | |
| weights_path: Path to YOLO model weights file (.pt) | |
| conf: Detection confidence threshold (0.0-1.0) | |
| imgsz: Input image size for YOLO processing | |
| """ | |
| from ultralytics import YOLO | |
| self.model = YOLO(weights_path) | |
| self.conf = conf | |
| self.imgsz = imgsz | |
| self.last_detection = None | |
| print(f"[INFO] Loaded YOLO weights: {weights_path}") | |
| def detect(self, frame): | |
| """ | |
| Detect rover in a single video frame with intelligent filtering. | |
| This method performs the core detection logic: | |
| 1. Run YOLO inference on the input frame | |
| 2. Filter detections by size and edge proximity | |
| 3. Select best candidate using confidence and tracking consistency | |
| Args: | |
| frame: Input video frame (numpy array) | |
| Returns: | |
| tuple: (x1, y1, x2, y2, confidence) or None if no detection | |
| Why: Multi-stage filtering ensures reliable rover detection | |
| What: Returns best rover bounding box with confidence score | |
| """ | |
| h, w = frame.shape[:2] | |
| img_area = w * h | |
| edge_thresh = max(10, int(0.01 * min(w, h))) | |
| min_area_px = max(100, int(0.00001 * img_area)) | |
| max_area_px = int(0.8 * img_area) | |
| # Run YOLO inference | |
| results = self.model.predict( | |
| source=frame, imgsz=self.imgsz, conf=self.conf, | |
| iou=0.5, agnostic_nms=True, verbose=False | |
| ) | |
| # Filter and validate detections | |
| candidates = [] | |
| for r in results: | |
| if r.boxes is None: | |
| continue | |
| for b in r.boxes: | |
| x1, y1, x2, y2 = [float(v) for v in b.xyxy[0].tolist()] | |
| bw, bh = x2 - x1, y2 - y1 | |
| area = bw * bh | |
| # Size-based filtering (too small or too large) | |
| if area < min_area_px or area > max_area_px: | |
| continue | |
| # Edge proximity filtering (avoid edge artifacts) | |
| if (x1 < 5 or y1 < 5 or x2 > w - 5 or y2 > h - 5): | |
| continue | |
| conf = float(b.conf[0].item()) if hasattr(b, "conf") else 0.0 | |
| candidates.append((x1, y1, x2, y2, conf)) | |
| if not candidates: | |
| return None | |
| if len(candidates) == 1: | |
| return candidates[0] | |
| # Multi-candidate selection with tracking consistency | |
| candidates.sort(key=lambda t: t[4], reverse=True) | |
| top_candidates = candidates[:3] | |
| # Use tracking consistency if previous detection exists | |
| if self.last_detection is not None: | |
| lx1, ly1, lx2, ly2, _ = self.last_detection | |
| lcx, lcy = (lx1 + lx2) / 2.0, (ly1 + ly2) / 2.0 | |
| def dist(c): | |
| cx, cy = (c[0] + c[2]) / 2.0, (c[1] + c[3]) / 2.0 | |
| return ((cx - lcx) ** 2 + (cy - lcy) ** 2) ** 0.5 | |
| return min(top_candidates, key=dist) | |
| return top_candidates[0] | |
| class Kalman3D: | |
| """ | |
| 3D Kalman Filter for rover trajectory smoothing and prediction. | |
| This class implements a 3D Kalman filter to smooth rover position measurements | |
| and predict rover position when detection fails. The filter tracks position | |
| and velocity in 3D space (x, y, z coordinates). | |
| State Vector: [x, y, z, vx, vy, vz] (position + velocity) | |
| Measurement: [x, y, z] (position only from detection) | |
| Why: Raw detections are noisy and may have gaps | |
| What: Provides smooth, continuous trajectory estimates | |
| """ | |
| def __init__(self, dt=0.01): | |
| """ | |
| Initialize 3D Kalman filter with system dynamics. | |
| Args: | |
| dt: Time step between measurements (seconds) | |
| """ | |
| self.dt = dt | |
| # State vector: [x, y, z, vx, vy, vz] | |
| self.state = np.zeros(6) | |
| # State transition matrix F (constant velocity model) | |
| self.F = np.eye(6) | |
| self.F[0, 3] = dt # x += vx * dt | |
| self.F[1, 4] = dt # y += vy * dt | |
| self.F[2, 5] = dt # z += vz * dt | |
| # Measurement matrix H (we measure position only) | |
| self.H = np.eye(3, 6) | |
| # Process noise covariance Q (uncertainty in motion model) | |
| self.Q = np.eye(6) * 0.1 | |
| # Measurement noise covariance R (uncertainty in measurements) | |
| self.R = np.eye(3) * 1.0 | |
| # Error covariance matrix P (uncertainty in state estimate) | |
| self.P = np.eye(6) * 100.0 | |
| def predict(self): | |
| """ | |
| Predict next state using motion model (no measurement). | |
| Returns: | |
| np.array: Predicted state vector [x, y, z, vx, vy, vz] | |
| Why: Estimate rover position when detection fails | |
| What: Advances state using constant velocity model | |
| """ | |
| self.state = self.F @ self.state | |
| self.P = self.F @ self.P @ self.F.T + self.Q | |
| return self.state | |
| def update(self, measurement): | |
| """ | |
| Update state estimate with new measurement. | |
| Args: | |
| measurement: [x, y, z] position measurement from detection | |
| Returns: | |
| np.array: Updated state vector [x, y, z, vx, vy, vz] | |
| Why: Incorporate new measurements to improve accuracy | |
| What: Combines prediction with measurement using Kalman equations | |
| """ | |
| y = measurement - self.H @ self.state # Innovation (measurement residual) | |
| S = self.H @ self.P @ self.H.T + self.R # Innovation covariance | |
| K = self.P @ self.H.T @ np.linalg.inv(S) # Kalman gain | |
| self.state = self.state + K @ y # Update state estimate | |
| self.P = (np.eye(6) - K @ self.H) @ self.P # Update error covariance | |
| return self.state | |
| class DepthEstimator: | |
| def __init__(self, focal_length=500, real_width=0.2): | |
| self.focal_length = focal_length | |
| self.real_width = real_width | |
| def z_from_bbox(self, bbox): | |
| x1, y1, x2, y2 = bbox | |
| width_px = x2 - x1 | |
| if width_px > 0: | |
| z = (self.real_width * self.focal_length) / width_px | |
| return max(0.001, min(10.0, z)) | |
| return 0.1 | |
| class RoverAudioFeatures: | |
| """ | |
| Audio feature extraction for rover motor analysis. | |
| This class extracts meaningful audio features from rover video that correlate | |
| with motor commands and rover movement. It analyzes the acoustic signature | |
| of rover motors to infer motor speeds and movement patterns. | |
| Key Features Extracted: | |
| - RMS Energy: Overall audio intensity (motor power indicator) | |
| - Spectral Centroid: Average frequency (motor speed indicator) | |
| - Peak Frequency: Dominant frequency in 80-3000 Hz band (motor RPM) | |
| - Motor Commands: Converted to omega_r, omega_l wheel speeds | |
| Why: Audio contains rich information about rover motor states | |
| What: Extracts motor-relevant features from acoustic data | |
| """ | |
| def __init__(self, wav_path=None, sr_target=22050): | |
| """ | |
| Initialize audio feature extractor. | |
| Args: | |
| wav_path: Path to audio file (optional) | |
| sr_target: Target sample rate for audio processing | |
| """ | |
| self.wav_path = wav_path | |
| self.sr_target = sr_target | |
| self.t = None # Time array for features | |
| self.rms = None # RMS energy over time | |
| self.centroid_hz = None # Spectral centroid over time | |
| self.peak_hz = None # Peak frequency over time | |
| def extract_wav_from_video(video_path, out_wav, fps=44100): | |
| clip = VideoFileClip(video_path) | |
| if clip.audio is None: | |
| raise RuntimeError("No audio track in video.") | |
| clip.audio.write_audiofile(out_wav, fps=fps) | |
| return out_wav | |
| def compute(self, wav_path): | |
| y, sr = librosa.load(wav_path, sr=self.sr_target, mono=True) | |
| n_fft = 2048 | |
| hop_length = 512 | |
| S = np.abs(librosa.stft(y, n_fft=n_fft, hop_length=hop_length))**2 | |
| freqs = librosa.fft_frequencies(sr=sr, n_fft=n_fft) | |
| rms = librosa.feature.rms(y=y, frame_length=n_fft, hop_length=hop_length).flatten() | |
| centroid = librosa.feature.spectral_centroid(S=S, sr=sr).flatten() | |
| lo = np.searchsorted(freqs, 80) | |
| hi = np.searchsorted(freqs, 3000) | |
| band = S[lo:hi, :] | |
| peak_bin = np.argmax(band, axis=0) | |
| peak_freq = freqs[lo + peak_bin] | |
| times = librosa.frames_to_time(np.arange(len(rms)), sr=sr, hop_length=hop_length) | |
| self.t = times | |
| self.rms = rms | |
| self.centroid_hz = centroid | |
| self.peak_hz = peak_freq | |
| def convert_to_rover_motors(self, peak_hz, movement_type="straight", turn_intensity=0.0): | |
| if np.isnan(peak_hz) or peak_hz <= 0: | |
| return 0.0, 0.0 | |
| base_speed = peak_hz * 0.8 | |
| max_differential = 0.4 | |
| if movement_type == "straight": | |
| omega_r = base_speed | |
| omega_l = base_speed | |
| elif movement_type == "turn_right": | |
| differential = turn_intensity * max_differential / 0.3 | |
| omega_r = base_speed * (1.0 - differential) | |
| omega_l = base_speed * (1.0 + differential) | |
| elif movement_type == "turn_left": | |
| differential = turn_intensity * max_differential / 0.3 | |
| omega_r = base_speed * (1.0 + differential) | |
| omega_l = base_speed * (1.0 - differential) | |
| elif movement_type == "stop": | |
| omega_r = 0.0 | |
| omega_l = 0.0 | |
| else: | |
| omega_r = base_speed | |
| omega_l = base_speed | |
| return float(omega_r), float(omega_l) | |
| def detect_movement_type(self, trajectory_data, frame_idx): | |
| if frame_idx < 5: | |
| return "straight", 0.0 | |
| start_idx = max(0, frame_idx - 15) | |
| end_idx = frame_idx + 1 | |
| if end_idx > len(trajectory_data): | |
| return "straight", 0.0 | |
| recent_x = trajectory_data[start_idx:end_idx, 0] | |
| recent_y = trajectory_data[start_idx:end_idx, 1] | |
| if len(recent_x) < 3: | |
| return "straight", 0.0 | |
| dx = np.diff(recent_x) | |
| dy = np.diff(recent_y) | |
| headings = np.arctan2(dy, dx) | |
| heading_changes = np.diff(headings) | |
| heading_changes = np.arctan2(np.sin(heading_changes), np.cos(heading_changes)) | |
| avg_angular_velocity = np.mean(np.abs(heading_changes)) | |
| avg_heading_change = np.mean(heading_changes) | |
| turn_threshold = 0.05 | |
| if avg_angular_velocity < turn_threshold: | |
| return "straight", 0.0 | |
| return ("turn_left", min(abs(avg_heading_change), 0.3)) if avg_heading_change > 0 else ("turn_right", min(abs(avg_heading_change), 0.3)) | |
| def value_at_time(self, t_sec, frame_idx, total_frames, trajectory_data=None): | |
| if self.t is None: | |
| return (np.nan, np.nan, np.nan, np.nan, np.nan, np.nan) | |
| def interp(arr): | |
| return float(np.interp(t_sec, self.t, arr)) | |
| rms_val = interp(self.rms) | |
| centroid_val = interp(self.centroid_hz) | |
| peak_val = interp(self.peak_hz) | |
| if trajectory_data is not None: | |
| movement_type, turn_intensity = self.detect_movement_type(trajectory_data, frame_idx) | |
| else: | |
| movement_type, turn_intensity = "straight", 0.0 | |
| omega_r, omega_l = self.convert_to_rover_motors(peak_val, movement_type, turn_intensity) | |
| return (rms_val, centroid_val, peak_val, omega_r, omega_l, movement_type) | |
| def process_video(video_path, weights_path, output_video, output_csv, conf=0.15): | |
| """ | |
| Process video to extract rover trajectory and create annotated video. | |
| This is the core video processing function that: | |
| 1. Loads video and YOLO model | |
| 2. Detects rover in each frame | |
| 3. Tracks trajectory using Kalman filtering | |
| 4. Creates annotated video with trajectory overlay | |
| 5. Saves trajectory data and generates plots | |
| Args: | |
| video_path: Path to input video file | |
| weights_path: Path to YOLO model weights | |
| output_video: Path for annotated video output | |
| output_csv: Path for trajectory CSV output | |
| conf: YOLO detection confidence threshold | |
| Why: Video processing is the foundation of trajectory analysis | |
| What: Extracts smooth rover trajectory from raw video frames | |
| """ | |
| print(f"[STEP 1] Processing video: {video_path}") | |
| print(f"[STEP 1] Output video: {output_video}") | |
| print(f"[STEP 1] Output CSV: {output_csv}") | |
| os.makedirs(os.path.dirname(output_csv), exist_ok=True) | |
| detector = DroneDetector(weights_path, conf=conf) | |
| kf = Kalman3D() | |
| depth_est = DepthEstimator() | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise RuntimeError(f"Cannot open video: {video_path}") | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_video, fourcc, fps, (width, height)) | |
| csv_f = open(output_csv, "w", newline="") | |
| csvw = csv.writer(csv_f) | |
| csvw.writerow(["frame", "time_s", "x", "y", "z", "conf"]) | |
| x_series, y_series, z_series = [], [], [] | |
| frame_idx = 0 | |
| while True: | |
| ok, frame = cap.read() | |
| if not ok: | |
| break | |
| frame_time = frame_idx / fps | |
| det = detector.detect(frame) | |
| if det is not None: | |
| x1, y1, x2, y2, conf_val = det | |
| cx = (x1 + x2) / 2.0 | |
| cy = (y1 + y2) / 2.0 | |
| z = 0.0 # Force planar assumption: ignore depth | |
| kf.predict() | |
| xs = kf.update(np.array([cx, cy, z], dtype=float)).squeeze() | |
| xk, yk, zk = float(xs[0]), float(xs[1]), float(xs[2]) | |
| x_series.append(xk) | |
| y_series.append(yk) | |
| z_series.append(zk) | |
| cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2) | |
| cv2.circle(frame, (int(xk), int(yk)), 6, (0, 0, 255), -1) | |
| cv2.putText(frame, f"x={xk:.1f}, y={yk:.1f}, z={zk:.3f}, conf={conf_val:.2f}", | |
| (int(x1), max(20, int(y1) - 8)), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| csvw.writerow([frame_idx, f"{frame_time:.3f}", f"{xk:.2f}", f"{yk:.2f}", f"{zk:.5f}", f"{conf_val:.3f}"]) | |
| else: | |
| xs = kf.predict().squeeze() | |
| xk, yk, zk = float(xs[0]), float(xs[1]), float(xs[2]) | |
| x_series.append(xk) | |
| y_series.append(yk) | |
| z_series.append(zk) | |
| cv2.circle(frame, (int(xk), int(yk)), 5, (0, 255, 255), -1) | |
| csvw.writerow([frame_idx, f"{frame_time:.3f}", f"{xk:.2f}", f"{yk:.2f}", f"{zk:.5f}", "0.000"]) | |
| out.write(frame) | |
| frame_idx += 1 | |
| if frame_idx % 30 == 0: | |
| print(f"[PROGRESS] Processed {frame_idx} frames") | |
| check_memory_usage() | |
| cap.release() | |
| out.release() | |
| csv_f.close() | |
| if x_series and y_series and z_series: | |
| x_arr = np.array(x_series) | |
| y_arr = np.array(y_series) | |
| z_arr = np.array(z_series) | |
| # Match main.py behavior (N x 100 matrices for memory optimization) | |
| X = np.tile(x_arr.reshape(-1), (100, 1)).T | |
| Y = np.tile(y_arr.reshape(-1), (100, 1)).T | |
| Z = np.tile(z_arr.reshape(-1), (100, 1)).T | |
| os.makedirs("data", exist_ok=True) | |
| np.savetxt("data/xData.txt", X, fmt='%.6f') | |
| np.savetxt("data/yData.txt", Y, fmt='%.6f') | |
| np.savetxt("data/zData.txt", Z, fmt='%.6f') | |
| del X, Y, Z, x_arr, y_arr, z_arr | |
| gc.collect() | |
| print(f"[STEP 1] ✅ Saved trajectory data: {len(x_series)} frames") | |
| # Create trajectory plot and save in output directory | |
| print("[STEP 1] Creating trajectory plot...") | |
| plt.figure(figsize=(10, 8)) | |
| plt.plot(x_series, y_series, 'b-', linewidth=2, label='Rover Trajectory') | |
| plt.plot(x_series[0], y_series[0], 'go', markersize=10, label='Start') | |
| plt.plot(x_series[-1], y_series[-1], 'rs', markersize=10, label='End') | |
| plt.xlabel('X Position (pixels)') | |
| plt.ylabel('Y Position (pixels)') | |
| plt.title('Rover Trajectory in X-Y Plane (Pixel Coordinates)') | |
| plt.grid(True, alpha=0.3) | |
| plt.legend() | |
| plt.axis('equal') | |
| plt.tight_layout() | |
| # Save plot in output directory | |
| output_dir = os.path.dirname(output_video) | |
| if not output_dir: | |
| output_dir = "output" | |
| os.makedirs(output_dir, exist_ok=True) | |
| plot_path = os.path.join(output_dir, 'xy_trajectory_plot.png') | |
| plt.savefig(plot_path, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| print(f"[STEP 1] ✅ Saved trajectory plot: {plot_path}") | |
| print(f"[STEP 1] ✅ COMPLETED!") | |
| print(f"[STEP 1] Output files:") | |
| print(f" - Video: {output_video}") | |
| print(f" - CSV: {output_csv}") | |
| print(f" - xData.txt, yData.txt, zData.txt in data/") | |
| print(f" - xy_trajectory_plot.png in output directory") | |
| def process_audio(video_path, trajectory_csv, output_csv, output_dir="data"): | |
| """ | |
| Process audio from video and correlate with trajectory data. | |
| This function performs the audio analysis pipeline: | |
| 1. Extracts audio from video file | |
| 2. Computes audio features (RMS, spectral centroid, peak frequency) | |
| 3. Correlates audio features with rover trajectory | |
| 4. Generates motor commands based on audio analysis | |
| 5. Creates audio analysis plots and saves motor data | |
| Args: | |
| video_path: Path to input video file (for audio extraction) | |
| trajectory_csv: Path to trajectory CSV from video processing | |
| output_csv: Path for combined trajectory+audio CSV output | |
| output_dir: Directory for motor command files | |
| Why: Audio provides motor state information not visible in video | |
| What: Correlates acoustic signatures with rover movement patterns | |
| """ | |
| print(f"[STEP 2] Processing audio from: {video_path}") | |
| print(f"[STEP 2] Using trajectory from: {trajectory_csv}") | |
| os.makedirs(output_dir, exist_ok=True) | |
| tmp_wav = "tmp_audio.wav" | |
| import pandas as pd | |
| try: | |
| print("[STEP 2] Extracting audio from video...") | |
| RoverAudioFeatures.extract_wav_from_video(video_path, tmp_wav, fps=44100) | |
| print("[STEP 2] Computing audio features...") | |
| raf = RoverAudioFeatures(tmp_wav, sr_target=22050) | |
| raf.compute(tmp_wav) | |
| have_audio = True | |
| print("[STEP 2] ✅ Audio processing successful") | |
| except Exception as e: | |
| print(f"[STEP 2] ❌ Audio processing failed: {e}") | |
| have_audio = False | |
| raf = None | |
| print("[STEP 2] Reading trajectory data...") | |
| df = pd.read_csv(trajectory_csv) | |
| total_frames = len(df) | |
| print(f"[STEP 2] Found {total_frames} trajectory points") | |
| trajectory_data = df[['x', 'y']].values | |
| # Auto-calibrate alpha: map peak Hz -> wheel rad/s by matching heading rate | |
| alpha = 1.0 | |
| if have_audio and total_frames > 5: | |
| try: | |
| # Compute heading rate from image-plane path | |
| t_arr = df['time_s'].values.astype(float) | |
| x_arr = df['x'].values.astype(float) | |
| y_arr = df['y'].values.astype(float) | |
| # Use time-based gradients to avoid fps assumptions | |
| dx = np.gradient(x_arr, t_arr) | |
| dy = np.gradient(y_arr, t_arr) | |
| psi = np.unwrap(np.arctan2(dy, dx)) | |
| psi_dot_obs = np.gradient(psi, t_arr) # observed heading rate (rad/s) | |
| # Also compute actual linear velocity for direct calibration | |
| # Apply coordinate scaling to get realistic velocities | |
| pixel_to_meter = 0.005818 # Same scaling as EMMA | |
| velocities = np.sqrt(dx**2 + dy**2) * pixel_to_meter # Linear velocity magnitude in m/s | |
| avg_velocity = np.mean(velocities) | |
| # Interpolate audio peaks at each frame time | |
| peaks = [] | |
| for i, ti in enumerate(t_arr): | |
| _, _, peak_i, _, _, _ = raf.value_at_time(ti, int(df['frame'].iloc[i]), total_frames, None) | |
| peaks.append(float(peak_i) if np.isfinite(peak_i) and peak_i > 0 else 0.0) | |
| peaks = np.asarray(peaks, dtype=float) | |
| # Use EspeleoRobo parameters from RoverSim: r and b | |
| r_wheel = 0.151 # wheel radius (m) | |
| b_track = 0.18 # half track width (m); L = 2*b | |
| L_track = 2.0 * b_track | |
| # Direct velocity-based calibration: alpha = (2 * target_velocity) / (r_wheel * avg_peak) | |
| # This directly maps audio peaks to the observed rover velocity | |
| valid_peaks = peaks[peaks > 0] | |
| if len(valid_peaks) > 0: | |
| med_peak = float(np.median(valid_peaks)) | |
| alpha_direct = (2.0 * avg_velocity) / (r_wheel * med_peak) | |
| print(f"[STEP 2] Direct velocity calibration: alpha={alpha_direct:.6f} (target velocity: {avg_velocity:.3f} m/s)") | |
| else: | |
| alpha_direct = alpha | |
| # Use direct velocity calibration as primary method | |
| alpha = alpha_direct | |
| # Validate the direct calibration | |
| if not np.isfinite(alpha) or alpha <= 0 or alpha > 100.0: | |
| print(f"[STEP 2] Direct calibration failed, trying heading rate method...") | |
| # Fallback to heading rate method | |
| mask = np.isfinite(psi_dot_obs) & np.isfinite(peaks) & (peaks > 0) | |
| if np.count_nonzero(mask) > 10 and np.sum(peaks[mask]**2) > 0: | |
| num = np.sum(peaks[mask] * psi_dot_obs[mask]) | |
| den = np.sum(peaks[mask] ** 2) | |
| alpha_est = -(L_track / r_wheel) * (num / den) | |
| # Keep alpha non-negative; fall back if absurd | |
| if np.isfinite(alpha_est) and alpha_est > 0 and alpha_est < 100.0: | |
| alpha = float(alpha_est) | |
| else: | |
| # Final fallback: use direct velocity method with reasonable bounds | |
| alpha = max(0.1, min(alpha_direct, 50.0)) | |
| else: | |
| # Final fallback: use direct velocity method with reasonable bounds | |
| alpha = max(0.1, min(alpha_direct, 50.0)) | |
| print(f"[STEP 2] Final auto-calibrated alpha (Hz->rad/s): {alpha:.6f}") | |
| except Exception as e: | |
| print(f"[STEP 2] Alpha calibration failed, using alpha=1.0: {e}") | |
| alpha = 1.0 | |
| print("[STEP 2] Processing audio features for each frame...") | |
| omega_r_series = [] | |
| omega_l_series = [] | |
| with open(output_csv, "w", newline="") as f: | |
| wcsv = csv.writer(f) | |
| wcsv.writerow(["frame", "time_s", "x", "y", "z", "conf", "audio_rms", "centroid_hz", "peak_hz", "omega_r", "omega_l"]) | |
| for idx, row in df.iterrows(): | |
| frame_idx = int(row['frame']) | |
| t = float(row['time_s']) | |
| x = float(row['x']); y = float(row['y']); z = float(row['z']); conf = row['conf'] | |
| if have_audio: | |
| # Use calibrated mapping: omega_l=0, omega_r=alpha*peak (rad/s) - LEFT wheel stopped for RIGHT turn | |
| rms, cen, peak, _omega_r_unused, _omega_l_unused, movement = raf.value_at_time(t, frame_idx, total_frames, trajectory_data) | |
| omega_r = alpha * float(peak) if np.isfinite(peak) and peak > 0 else 0.0 | |
| omega_l = 0.0 | |
| # Reduced clamp limit for more realistic angular velocities | |
| if omega_r > 12.0: # Reduced to 12.0 rad/s for reasonable angular velocities | |
| omega_r = 12.0 | |
| else: | |
| rms = cen = peak = omega_r = omega_l = np.nan | |
| omega_r_series.append(float(omega_r) if np.isfinite(omega_r) else 0.0) | |
| omega_l_series.append(float(omega_l) if np.isfinite(omega_l) else 0.0) | |
| wcsv.writerow([frame_idx, f"{t:.3f}", f"{x:.2f}", f"{y:.2f}", f"{z:.5f}", f"{conf}", f"{rms:.6f}", f"{cen:.2f}", f"{peak:.2f}", f"{omega_r:.2f}", f"{omega_l:.2f}"]) | |
| if idx % 50 == 0: | |
| print(f"[PROGRESS] Processed {idx}/{total_frames} frames") | |
| print("[STEP 2] Generating motor command files...") | |
| try: | |
| if len(omega_r_series) > 0: | |
| # Build matrices matching xData.txt column count (fallback to 100) | |
| omega_r_arr = np.asarray(omega_r_series, dtype=float) | |
| omega_l_arr = np.asarray(omega_l_series, dtype=float) | |
| try: | |
| x_mat_path = os.path.join(output_dir, "xData.txt") | |
| x_mat = np.loadtxt(x_mat_path) | |
| n_cols = x_mat.shape[1] if x_mat.ndim == 2 else 1 | |
| except Exception: | |
| n_cols = 100 | |
| omega_r_arr[~np.isfinite(omega_r_arr)] = 0.0 | |
| omega_l_arr[~np.isfinite(omega_l_arr)] = 0.0 | |
| omega_r_mat = np.tile(omega_r_arr.reshape(-1, 1), (1, n_cols)) | |
| omega_l_mat = np.tile(omega_l_arr.reshape(-1, 1), (1, n_cols)) | |
| np.savetxt(os.path.join(output_dir, "omega_r.txt"), omega_r_mat, fmt='%.6f') | |
| np.savetxt(os.path.join(output_dir, "omega_l.txt"), omega_l_mat, fmt='%.6f') | |
| del omega_r_mat, omega_l_mat, omega_r_arr, omega_l_arr | |
| gc.collect() | |
| print(f"[STEP 2] ✅ Saved motor matrices matching xData.txt with {n_cols} columns") | |
| except Exception as e: | |
| print(f"[STEP 2] ❌ Failed to save motor files: {e}") | |
| # Create audio analysis plot and save in output directory | |
| print("[STEP 2] Creating audio analysis plot...") | |
| try: | |
| if have_audio and raf is not None: | |
| fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 8)) | |
| # Plot 1: RMS Energy | |
| ax1.plot(raf.t, raf.rms, 'b-', linewidth=1) | |
| ax1.set_xlabel('Time (s)') | |
| ax1.set_ylabel('RMS') | |
| ax1.set_title('Audio RMS Energy') | |
| ax1.grid(True, alpha=0.3) | |
| # Plot 2: Spectral Centroid | |
| ax2.plot(raf.t, raf.centroid_hz, 'g-', linewidth=1) | |
| ax2.set_xlabel('Time (s)') | |
| ax2.set_ylabel('Frequency (Hz)') | |
| ax2.set_title('Spectral Centroid') | |
| ax2.grid(True, alpha=0.3) | |
| # Plot 3: Peak Frequency | |
| ax3.plot(raf.t, raf.peak_hz, 'r-', linewidth=1) | |
| ax3.set_xlabel('Time (s)') | |
| ax3.set_ylabel('Frequency (Hz)') | |
| ax3.set_title('Peak Frequency') | |
| ax3.grid(True, alpha=0.3) | |
| # Plot 4: Motor Commands | |
| if len(omega_r_series) > 0 and len(omega_l_series) > 0: | |
| # Get time array from trajectory data | |
| time_array = df['time_s'].values | |
| ax4.plot(time_array, omega_r_series, 'k-', linewidth=2, label='omega_r (right wheel)') | |
| ax4.plot(time_array, omega_l_series, 'orange', linewidth=2, label='omega_l (left wheel)') | |
| ax4.set_xlabel('Time (s)') | |
| ax4.set_ylabel('Angular Velocity (rad/s)') | |
| ax4.set_title('Motor Commands') | |
| ax4.legend() | |
| ax4.grid(True, alpha=0.3) | |
| else: | |
| ax4.text(0.5, 0.5, 'No motor data available', ha='center', va='center', transform=ax4.transAxes) | |
| ax4.set_title('Motor Commands (No Data)') | |
| plt.tight_layout() | |
| # Save plot in output directory | |
| output_plot_dir = "output" | |
| os.makedirs(output_plot_dir, exist_ok=True) | |
| plot_path = os.path.join(output_plot_dir, 'audio_analysis.png') | |
| plt.savefig(plot_path, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| print(f"[STEP 2] ✅ Saved audio analysis plot: {plot_path}") | |
| else: | |
| print("[STEP 2] ⚠️ No audio data available for plotting") | |
| except Exception as e: | |
| print(f"[STEP 2] ❌ Failed to create audio analysis plot: {e}") | |
| if os.path.exists(tmp_wav): | |
| try: | |
| os.remove(tmp_wav) | |
| except Exception: | |
| pass | |
| print(f"[STEP 2] ✅ COMPLETED!") | |
| print(f"[STEP 2] Output files:") | |
| print(f" - CSV with audio: {output_csv}") | |
| print(f" - omega_r.txt, omega_l.txt in {output_dir}/") | |
| print(f" - audio_analysis.png in output/ directory") | |
| def run_EMMA_optimization(): | |
| """ | |
| Main function to run EMMA rover parameter estimation. | |
| This function: | |
| 1. Loads rover trajectory data | |
| 2. Creates and trains the LTC neural network | |
| 3. Estimates 9 physical rover parameters | |
| 4. Saves results and creates simulation visualization | |
| """ | |
| # Set random seeds for reproducibility | |
| import random | |
| random.seed(42) | |
| np.random.seed(42) | |
| torch.manual_seed(42) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed_all(42) | |
| print("[STEP 3] Starting EMMA rover optimization...") | |
| print("Starting EMMA Rover Training...") | |
| # Training parameters - Aggressive optimization for better convergence | |
| seq_len = 16 # Shorter sequences for more stable training | |
| batch_size = 2 # Very small batch size for better gradient estimates | |
| num_epochs = 500 # Much more epochs for better convergence | |
| learning_rate = 0.0003 # Even more conservative learning rate | |
| # Load rover trajectory data | |
| dataset = HarData(seq_len=seq_len) | |
| # Create neural network model | |
| model = HarModel(model_type="ltc", model_size=64, learning_rate=learning_rate).to(device) | |
| optimizer = model.optimizer | |
| scheduler = model.scheduler | |
| print(f"Model parameters: {sum(p.numel() for p in model.parameters())}") | |
| print("Starting training...") | |
| train_losses = [] | |
| best_loss = float('inf') | |
| patience = 50 # Much more patience for extended training | |
| patience_counter = 0 | |
| for epoch in range(num_epochs): | |
| model.train() | |
| epoch_loss = 0.0 | |
| batch_count = 0 | |
| for batch_x, batch_y, batch_motor1, batch_motor2, batch_motor3, batch_motor4 in dataset.iterate_train(batch_size=batch_size): | |
| batch_x = batch_x.to(device) | |
| batch_y = batch_y.to(device) | |
| batch_motor1 = batch_motor1.to(device) | |
| batch_motor2 = batch_motor2.to(device) | |
| batch_motor3 = batch_motor3.to(device) | |
| batch_motor4 = batch_motor4.to(device) | |
| optimizer.zero_grad() | |
| # Forward pass | |
| predicted_params = model(batch_x) | |
| # Compute physics-based loss | |
| loss = model.compute_loss(predicted_params, batch_y, batch_motor1, batch_motor2, batch_motor3, batch_motor4) | |
| if torch.isnan(loss): | |
| print(f"Warning: NaN loss detected at epoch {epoch}, batch {batch_count}") | |
| continue | |
| # Backward pass with gradient clipping | |
| loss.backward() | |
| torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) | |
| optimizer.step() | |
| epoch_loss += loss.item() | |
| batch_count += 1 | |
| if batch_count % 5 == 0: | |
| print(f'Epoch {epoch}, Batch {batch_count}, Loss: {loss.item():.6f}') | |
| if batch_count > 0: | |
| avg_loss = epoch_loss / batch_count | |
| train_losses.append(avg_loss) | |
| scheduler.step() # CosineAnnealingWarmRestarts doesn't need loss parameter | |
| print(f'Epoch {epoch}, Average Loss: {avg_loss:.6f}') | |
| # Save best model and check for early stopping | |
| if avg_loss < best_loss: | |
| best_loss = avg_loss | |
| patience_counter = 0 # Reset patience counter | |
| torch.save({ | |
| 'model_state_dict': model.state_dict(), | |
| 'optimizer_state_dict': optimizer.state_dict(), | |
| 'train_losses': train_losses, | |
| 'epoch': epoch, | |
| 'loss': avg_loss | |
| }, 'rover_EMMA_final_model.pth') | |
| print(f"New best model saved with loss: {best_loss:.6f}") | |
| else: | |
| patience_counter += 1 | |
| # Early stopping | |
| if patience_counter >= patience: | |
| print(f"Early stopping triggered after {epoch+1} epochs") | |
| break | |
| else: | |
| print(f"Warning: No batches processed in epoch {epoch}") | |
| print("Training completed!") | |
| # Load best model | |
| checkpoint = torch.load('rover_EMMA_final_model.pth', map_location=device) | |
| model.load_state_dict(checkpoint['model_state_dict']) | |
| # Evaluate and save results | |
| model.eval() | |
| with torch.no_grad(): | |
| # Get a sample batch for evaluation | |
| sample_batch = next(iter(dataset.iterate_train(batch_size=1))) | |
| sample_x, sample_y, sample_motor1, sample_motor2, sample_motor3, sample_motor4 = sample_batch | |
| sample_x = sample_x.to(device) | |
| sample_y = sample_y.to(device) | |
| sample_motor1 = sample_motor1.to(device) | |
| sample_motor2 = sample_motor2.to(device) | |
| sample_motor3 = sample_motor3.to(device) | |
| sample_motor4 = sample_motor4.to(device) | |
| # Get predicted parameters | |
| predicted_params = model(sample_x) | |
| # Convert to physical parameters | |
| maxChange = 95.0 | |
| getp = lambda k: predicted_params[:,:,k].mean() | |
| a = (1 + (0.5 - getp(0)) * maxChange / 100.0) * 0.215 # X-arm length (m) | |
| b = (1 + (0.5 - getp(1)) * maxChange / 100.0) * 0.18 # Y-arm length (m) | |
| r = (1 + (0.5 - getp(2)) * maxChange / 100.0) * 0.151 # Wheel radius (m) | |
| m = (1 + (0.5 - getp(3)) * maxChange / 100.0) * 27.4 # Mass (kg) | |
| J = (1 + (0.5 - getp(4)) * maxChange / 100.0) * 0.76 # Moment of inertia (kg⋅m²) | |
| kf = (1 + (0.5 - getp(5)) * maxChange / 100.0) * 0.48 # Friction coefficient | |
| CM = (1 + (0.5 - getp(6)) * maxChange / 100.0) * 0.12 # Center of mass height (m) | |
| Cd = (1 + (0.5 - getp(7)) * maxChange / 100.0) * 0.1 # Drag coefficient | |
| Sf = (0.5 - getp(8).mean()) * 0.2 + 0.8 # Learnable scaling factor | |
| Sf = Sf.expand(1) # Expand to batch size for consistency | |
| # Save parameters to CSV | |
| vals = [a.item(), b.item(), r.item(), m.item(), J.item(), kf.item(), CM.item(), Cd.item(), Sf.item()] | |
| with open('rover_coefficients.csv', 'w', newline='') as csvfile: | |
| w = csv.writer(csvfile) | |
| w.writerow(['Parameter', 'Value', 'Units', 'Description']) | |
| descriptions = [ | |
| 'X-arm length (forward/backward wheel distance)', | |
| 'Y-arm length (lateral wheel distance)', | |
| 'Wheel radius', | |
| 'Rover mass', | |
| 'Moment of inertia', | |
| 'Friction coefficient', | |
| 'Center of mass height', | |
| 'Drag coefficient', | |
| 'Motor command scaling factor' | |
| ] | |
| for name, val, unit, desc in zip(['a', 'b', 'r', 'm', 'J', 'kf', 'CM', 'Cd', 'Sf'], | |
| vals, | |
| ['m', 'm', 'm', 'kg', 'kg*m^2', 'dimensionless', 'm', 'dimensionless', 'dimensionless'], | |
| descriptions): | |
| w.writerow([name, val, unit, desc]) | |
| print("\n=== ESTIMATED ROVER PARAMETERS ===") | |
| for name, val, unit in zip(['a', 'b', 'r', 'm', 'J', 'kf', 'CM', 'Cd', 'Sf'], vals, | |
| ['m', 'm', 'm', 'kg', 'kg⋅m²', 'dimensionless', 'm', 'dimensionless', 'dimensionless']): | |
| print(f"{name}: {val:.6f} {unit}") | |
| # Run simulation with estimated parameters | |
| simulator = ThetaSimulator() | |
| # Use the full sequence, not just the first timestep | |
| initial_state = sample_y[0, 0].cpu().numpy() # First timestep as initial state | |
| # Create motor commands for the full sequence | |
| # Data is [seq_len, batch_size, features], we want [seq_len, features] | |
| motor_commands = np.column_stack([ | |
| sample_motor1[:, 0, 0].cpu().numpy(), # Full sequence of omega_r | |
| sample_motor2[:, 0, 0].cpu().numpy() # Full sequence of omega_l | |
| ]) | |
| # Simulate trajectory for the full sequence | |
| duration = len(motor_commands) * 0.01 | |
| simulated_traj = simulator.simulate_trajectory(initial_state, motor_commands, vals, duration=duration) | |
| # Compare with actual trajectory (full sequence) | |
| actual_traj = sample_y[:, 0].cpu().numpy() # Full sequence [seq_len, features] | |
| # Calculate errors | |
| pos_err = np.sqrt(np.mean((actual_traj[:, :2] - simulated_traj[:, :2])**2)) | |
| head_err = np.mean(np.abs(actual_traj[:, 2] - simulated_traj[:, 2])) | |
| print(f"\nTrajectory Comparison Results:") | |
| print(f" Position RMSE: {pos_err:.4f} m") | |
| print(f" Heading MAE: {head_err:.4f} rad") | |
| # Create animated simulation | |
| simulator.create_animated_simulation(actual_traj, simulated_traj) | |
| print("Model saved as 'rover_EMMA_final_model.pth'") | |
| print("Parameters saved as 'rover_coefficients.csv'") | |
| print("Animation saved as 'rover_EMMA_simulation.gif'") | |
| def main(): | |
| """ | |
| Main function to run the complete rover analysis pipeline. | |
| This is the main automation function that orchestrates the entire rover analysis | |
| pipeline. It coordinates video processing, audio analysis, and data organization | |
| to provide a complete analysis of rover behavior from video input. | |
| Pipeline Execution Flow: | |
| ------------------------ | |
| 1. Initialize directories and configuration | |
| 2. Run video processing (rover detection + trajectory extraction) | |
| 3. Run audio processing (audio features + motor commands) | |
| 4. Run EMMA parameter estimation (physics-informed neural network) | |
| 5. Generate comprehensive output summary | |
| """ | |
| import sys | |
| # Check for command line arguments | |
| simulation_only = "--simulation-only" in sys.argv or "-s" in sys.argv | |
| if simulation_only: | |
| print("=" * 60) | |
| print("EMMA SIMULATION MODE") | |
| print("=" * 60) | |
| print("🎬 Running simulation with existing learned parameters...") | |
| try: | |
| # Check if required files exist | |
| if not os.path.exists('rover_coefficients.csv'): | |
| raise FileNotFoundError("rover_coefficients.csv not found. Please run full pipeline first.") | |
| if not os.path.exists('rover_EMMA_final_model.pth'): | |
| raise FileNotFoundError("rover_EMMA_final_model.pth not found. Please run full pipeline first.") | |
| # Load existing parameters | |
| import pandas as pd | |
| params_df = pd.read_csv('rover_coefficients.csv') | |
| print("Loaded existing rover parameters:") | |
| for _, row in params_df.iterrows(): | |
| print(f" {row['Parameter']}: {row['Value']:.6f} {row['Units']}") | |
| # Create simulator with existing parameters | |
| theta_coeffs = { | |
| 'a': params_df[params_df['Parameter'] == 'a']['Value'].iloc[0], | |
| 'b': params_df[params_df['Parameter'] == 'b']['Value'].iloc[0], | |
| 'r': params_df[params_df['Parameter'] == 'r']['Value'].iloc[0], | |
| 'm': params_df[params_df['Parameter'] == 'm']['Value'].iloc[0], | |
| 'J': params_df[params_df['Parameter'] == 'J']['Value'].iloc[0], | |
| 'kf': params_df[params_df['Parameter'] == 'kf']['Value'].iloc[0], | |
| 'CM': params_df[params_df['Parameter'] == 'CM']['Value'].iloc[0], | |
| 'Cd': params_df[params_df['Parameter'] == 'Cd']['Value'].iloc[0], | |
| 'Sf': 0.8 # Fixed scaling factor (not in CSV) | |
| } | |
| simulator = ThetaSimulator(theta_coeffs) | |
| # Load trajectory data for simulation | |
| x_data = np.loadtxt('data/xData.txt')[:, 0] | |
| y_data = np.loadtxt('data/yData.txt')[:, 0] | |
| pixel_to_meter = 0.005818 | |
| x_full = x_data * pixel_to_meter | |
| y_full = y_data * pixel_to_meter | |
| actual_trajectory = np.column_stack([x_full, y_full]) | |
| # Generate EMMA trajectory using existing parameters | |
| omega_r_data = np.loadtxt('data/omega_r.txt')[:, 0] | |
| omega_l_data = np.loadtxt('data/omega_l.txt')[:, 0] | |
| dt = 1.0/60.0 | |
| v_linear = theta_coeffs['r'] * (omega_r_data + omega_l_data) / 2.0 | |
| v_angular = theta_coeffs['r'] * (omega_r_data - omega_l_data) / (2 * theta_coeffs['b']) * 0.05 | |
| x_EMMA = np.zeros_like(x_full) | |
| y_EMMA = np.zeros_like(y_full) | |
| psi_EMMA = np.zeros_like(x_full) | |
| x_EMMA[0] = x_full[0] | |
| y_EMMA[0] = y_full[0] | |
| psi_EMMA[0] = 0.0 | |
| for i in range(1, len(x_EMMA)): | |
| psi_EMMA[i] = psi_EMMA[i-1] + v_angular[i] * dt | |
| vx = v_linear[i] * np.cos(psi_EMMA[i]) | |
| vy = v_linear[i] * np.sin(psi_EMMA[i]) | |
| x_EMMA[i] = x_EMMA[i-1] + vx * dt | |
| y_EMMA[i] = y_EMMA[i-1] + vy * dt | |
| EMMA_trajectory = np.column_stack([x_EMMA, y_EMMA]) | |
| print("Creating simulation animation with existing parameters...") | |
| anim = simulator.create_animated_simulation(actual_trajectory, EMMA_trajectory) | |
| print("\n✅ SIMULATION COMPLETED SUCCESSFULLY!") | |
| print("📋 OUTPUT SUMMARY:") | |
| print(" 🤖 EMMA parameters: rover_coefficients.csv") | |
| print(" 🧠 EMMA model: rover_EMMA_final_model.pth") | |
| print(" 🎬 Simulation animation: rover_EMMA_simulation.gif") | |
| except Exception as e: | |
| print(f"\n❌ SIMULATION FAILED: {e}") | |
| print("💡 Ensure that EMMA parameters have been learned first") | |
| print("💡 Run 'python run.py' to learn parameters before simulation") | |
| return | |
| # ======================================== | |
| # COMPLETE PIPELINE EXECUTION | |
| # ======================================== | |
| print("=" * 60) | |
| print("ROVER ANALYSIS PIPELINE") | |
| print("=" * 60) | |
| # ======================================== | |
| # CONFIGURATION SECTION | |
| # ======================================== | |
| # Modify these paths according to your setup | |
| video_path = "video.mp4" # Input video file (rover footage) | |
| weights_path = "yolo11m.pt" # YOLO model weights (medium size for balance) | |
| output_video = "output/annotated_rover.mp4" # Annotated video output | |
| trajectory_csv = "data/trajectory.csv" # Basic trajectory data | |
| audio_trajectory_csv = "data/trajectory_with_audio.csv" # Combined data | |
| # Ensure proper directory structure | |
| os.makedirs("output", exist_ok=True) # Visual outputs directory | |
| os.makedirs("data", exist_ok=True) # Data files directory | |
| try: | |
| # ======================================== | |
| # STEP 1: VIDEO PROCESSING | |
| # ======================================== | |
| print("\n" + "=" * 40) | |
| print("STEP 1: VIDEO PROCESSING") | |
| print("=" * 40) | |
| print("🔄 Detecting rover in video frames...") | |
| print("🔄 Tracking trajectory with Kalman filtering...") | |
| print("🔄 Creating annotated video with trajectory overlay...") | |
| process_video(video_path, weights_path, output_video, trajectory_csv) | |
| # ======================================== | |
| # STEP 2: AUDIO PROCESSING | |
| # ======================================== | |
| print("\n" + "=" * 40) | |
| print("STEP 2: AUDIO PROCESSING") | |
| print("=" * 40) | |
| print("🔄 Extracting audio from video...") | |
| print("🔄 Computing audio features (RMS, spectral analysis)...") | |
| print("🔄 Correlating audio with trajectory data...") | |
| print("🔄 Generating motor commands from audio...") | |
| process_audio(video_path, trajectory_csv, audio_trajectory_csv) | |
| # ======================================== | |
| # STEP 3: EMMA PARAMETER ESTIMATION | |
| # ======================================== | |
| print("\n" + "=" * 40) | |
| print("STEP 3: EMMA PARAMETER ESTIMATION") | |
| print("=" * 40) | |
| print("🔄 Loading trajectory and motor data...") | |
| print("🔄 Training LTC neural network...") | |
| print("🔄 Estimating rover physical parameters...") | |
| print("🔄 Generating simulation animation...") | |
| run_EMMA_optimization() | |
| # ======================================== | |
| # PIPELINE COMPLETION SUMMARY | |
| # ======================================== | |
| print("\n" + "=" * 60) | |
| print("✅ PIPELINE COMPLETED SUCCESSFULLY!") | |
| print("=" * 60) | |
| print("📋 OUTPUT SUMMARY:") | |
| print(f" 📹 Annotated video: {output_video}") | |
| print(f" 📊 Trajectory data: {trajectory_csv}") | |
| print(f" 🔊 Audio-trajectory data: {audio_trajectory_csv}") | |
| print(" 📈 XY trajectory plot: output/xy_trajectory_plot.png") | |
| print(" 📊 Audio analysis plot: output/audio_analysis.png") | |
| print(" 📁 Motor commands: data/omega_r.txt, data/omega_l.txt") | |
| print(" 📁 Position data: data/xData.txt, data/yData.txt, data/zData.txt") | |
| print(" 🤖 EMMA parameters: rover_coefficients.csv") | |
| print(" 🧠 EMMA model: rover_EMMA_final_model.pth") | |
| print(" 🎬 Simulation animation: rover_EMMA_simulation.gif") | |
| print("\n🎯 All outputs organized in output/, data/, and root directories") | |
| except Exception as e: | |
| print(f"\n❌ PIPELINE FAILED: {e}") | |
| print("💡 Check that video file and YOLO weights exist") | |
| print("💡 Ensure all required dependencies are installed") | |
| raise | |
| # EMMA Rover Parameter Estimation using Physics-Informed Neural Networks | |
| import os | |
| import csv | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import Dataset, DataLoader | |
| from ncps.torch import LTC | |
| from matplotlib.animation import FuncAnimation | |
| import matplotlib.pyplot as plt | |
| # Set device for computation (GPU if available, otherwise CPU) | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| print(f"Using device: {device}") | |
| # Global variable to store the number of features per timestep | |
| Nloop = 0 | |
| class Custom_CE_Loss(nn.Module): | |
| """ | |
| Custom loss function that integrates differential drive rover physics simulation. | |
| This is the core of the parameter estimation system. Instead of using a simple | |
| MSE loss, this function: | |
| 1. Takes predicted rover parameters from the neural network | |
| 2. Runs a complete differential drive physics simulation using these parameters | |
| 3. Compares the simulated trajectory with the actual rover trajectory | |
| 4. Returns the physics-based loss for gradient descent | |
| The physics simulation includes: | |
| - Differential drive kinematics (omega_r, omega_l -> v_linear, v_angular) | |
| - 6DOF dynamics (position, velocity, orientation, angular velocity) | |
| - Friction and drag forces | |
| - Motor dynamics and constraints | |
| This approach ensures that the learned parameters are physically meaningful | |
| and can be used for actual rover control. | |
| """ | |
| def __init__(self, labels, logits, uMotor1, uMotor2, uMotor3, uMotor4): | |
| """ | |
| Initialize the physics-based loss function. | |
| Args: | |
| labels: Actual trajectory data [T, B, 6] (x, y, psi, vx, vy, wz) | |
| logits: Predicted rover parameters from neural network [T, B, 8] | |
| uMotor1-4: Motor input commands for each motor [T, B, 1] | |
| """ | |
| super().__init__() | |
| # Store actual trajectory data for comparison | |
| self.y_true2 = labels # [T, B, 6] - actual trajectory data | |
| # Store predicted parameters from neural network | |
| self.y_pred2 = logits # [T, B, 8] - 8 rover parameters | |
| # Store motor input data for physics simulation | |
| self.y_uMotor1 = uMotor1 # Motor 1 input commands (omega_r) | |
| self.y_uMotor2 = uMotor2 # Motor 2 input commands (omega_l) | |
| self.y_uMotor3 = uMotor3 # Motor 3 input commands (omega_r duplicate) | |
| self.y_uMotor4 = uMotor4 # Motor 4 input commands (omega_l duplicate) | |
| def forward(self): | |
| """ | |
| Complete differential drive rover dynamics simulation with physics-based loss. | |
| This method performs the following steps: | |
| 1. Extract predicted parameters from neural network output | |
| 2. Convert normalized parameters to physical values | |
| 3. Initialize rover state variables | |
| 4. Run physics simulation for T timesteps | |
| 5. Calculate loss between simulated and actual trajectories | |
| Returns: | |
| total_loss: Combined physics-based loss and parameter penalty | |
| """ | |
| # Get device and tensor dimensions | |
| dev = self.y_pred2.device | |
| T, B, _ = self.y_pred2.shape # T=timesteps, B=batch_size, 9=parameters | |
| # ======================================== | |
| # STEP 1: Extract and Convert Parameters | |
| # ======================================== | |
| # The neural network outputs normalized values [0,1] for each parameter | |
| # We convert these to physical values with ±95% variation around nominal values | |
| maxChange = 95.0 # Maximum percentage change from nominal values | |
| getp = lambda k: self.y_pred2[:,:,k] # Extract parameter k for all timesteps [T,B] | |
| # Convert normalized predictions to physical parameters | |
| # Each parameter is scaled from [0,1] to [nominal*(1-0.95), nominal*(1+0.95)] | |
| # Based on EspeleoRobo parameters from RoverSim.py | |
| a = (1 + (0.5 - getp(0)) * maxChange / 100.0) * 0.215 # X-arm length (m) | |
| b = (1 + (0.5 - getp(1)) * maxChange / 100.0) * 0.18 # Y-arm length (m) | |
| r = (1 + (0.5 - getp(2)) * maxChange / 100.0) * 0.151 # Wheel radius (m) | |
| m = (1 + (0.5 - getp(3)) * maxChange / 100.0) * 27.4 # Mass (kg) | |
| J = (1 + (0.5 - getp(4)) * maxChange / 100.0) * 0.76 # Moment of inertia (kg⋅m²) | |
| kf = (1 + (0.5 - getp(5)) * maxChange / 100.0) * 0.48 # Friction coefficient | |
| CM = (1 + (0.5 - getp(6)) * maxChange / 100.0) * 0.12 # Center of mass height (m) | |
| Cd = (1 + (0.5 - getp(7)) * maxChange / 100.0) * 0.1 # Drag coefficient | |
| Sf = (0.5 - getp(8).mean()) * 0.2 + 0.8 # Learnable scaling factor for motor commands | |
| Sf = Sf.expand(B) # Expand to batch size for consistency | |
| # ======================================== | |
| # STEP 2: Physical Constants | |
| # ======================================== | |
| # These are fixed physical constants that don't change during training | |
| g = torch.tensor(9.81, device=dev) # Gravitational acceleration (m/s²) | |
| eps = torch.tensor(1e-3, device=dev) # Small epsilon for numerical stability | |
| # ======================================== | |
| # STEP 3: Initialize Rover State Variables | |
| # ======================================== | |
| # All state variables are initialized as [B] tensors (one value per batch) | |
| # These will be updated during the simulation loop | |
| # Position state (2D position in world coordinates) | |
| x_pos = torch.zeros(B, device=dev) # X position (m) | |
| y_pos = torch.zeros(B, device=dev) # Y position (m) | |
| psi = torch.zeros(B, device=dev) # Heading angle (rad) | |
| # Velocity state (2D velocity in world coordinates) | |
| vx = torch.zeros(B, device=dev) # X velocity (m/s) | |
| vy = torch.zeros(B, device=dev) # Y velocity (m/s) | |
| wz = torch.zeros(B, device=dev) # Angular velocity (rad/s) | |
| # ======================================== | |
| # STEP 4: Simulation Setup | |
| # ======================================== | |
| # Set up simulation parameters and storage arrays | |
| limitLoop = T # Number of simulation steps (matches data timesteps) | |
| tau = 0.01 # Time step (s) - smaller for better accuracy | |
| # Initialize arrays to store predicted trajectory | |
| predicted_x = torch.zeros((limitLoop, B), device=dev) # X position | |
| predicted_y = torch.zeros((limitLoop, B), device=dev) # Y position | |
| predicted_psi = torch.zeros((limitLoop, B), device=dev) # Heading angle | |
| predicted_vx = torch.zeros((limitLoop, B), device=dev) # X velocity | |
| predicted_vy = torch.zeros((limitLoop, B), device=dev) # Y velocity | |
| predicted_wz = torch.zeros((limitLoop, B), device=dev) # Angular velocity | |
| # Store initial states (t=0) | |
| predicted_x[0] = x_pos | |
| predicted_y[0] = y_pos | |
| predicted_psi[0] = psi | |
| predicted_vx[0] = vx | |
| predicted_vy[0] = vy | |
| predicted_wz[0] = wz | |
| # ======================================== | |
| # STEP 5: Get Actual Trajectory Data | |
| # ======================================== | |
| # Extract actual trajectory data for comparison | |
| actual_x = self.y_true2[:, :, 0] # [T,B] - actual x position | |
| actual_y = self.y_true2[:, :, 1] # [T,B] - actual y position | |
| actual_psi = self.y_true2[:, :, 2] # [T,B] - actual heading angle | |
| actual_vx = self.y_true2[:, :, 3] # [T,B] - actual x velocity | |
| actual_vy = self.y_true2[:, :, 4] # [T,B] - actual y velocity | |
| actual_wz = self.y_true2[:, :, 5] # [T,B] - actual angular velocity | |
| # Initialize from actual start conditions | |
| x_pos = actual_x[0, :].clone() | |
| y_pos = actual_y[0, :].clone() | |
| psi = actual_psi[0, :].clone() | |
| vx = actual_vx[0, :].clone() | |
| vy = actual_vy[0, :].clone() | |
| wz = actual_wz[0, :].clone() | |
| # ======================================== | |
| # STEP 6: Main Physics Simulation Loop | |
| # ======================================== | |
| # This is the core of the physics simulation | |
| # For each timestep, we: | |
| # 1. Get motor inputs from data | |
| # 2. Calculate differential drive kinematics | |
| # 3. Apply friction and drag forces | |
| # 4. Update 6DOF dynamics | |
| # 5. Store predicted states | |
| for i in range(1, limitLoop): | |
| # Current timestep index | |
| t_idx = i | |
| # ======================================== | |
| # STEP 6.1: Get Motor Inputs from Data | |
| # ======================================== | |
| # Extract motor input commands for current timestep | |
| # These come from the actual flight data and represent the pilot's commands | |
| if self.y_uMotor1.dim() == 3: | |
| # Handle 3D tensors [T, B, Nloop] - take first sequence | |
| omega_r_curr = self.y_uMotor1[t_idx, :, 0] # [B] - Right wheel input | |
| omega_l_curr = self.y_uMotor2[t_idx, :, 0] # [B] - Left wheel input | |
| else: | |
| # Handle 2D tensors [T, B] | |
| omega_r_curr = self.y_uMotor1[t_idx] # [B] - Right wheel input | |
| omega_l_curr = self.y_uMotor2[t_idx] # [B] - Left wheel input | |
| # ======================================== | |
| # STEP 6.2: Get Current Parameters | |
| # ======================================== | |
| # Get parameter values for current timestep | |
| a_curr = a[t_idx] # X-arm length | |
| b_curr = b[t_idx] # Y-arm length | |
| r_curr = r[t_idx] # Wheel radius | |
| m_curr = m[t_idx] # Mass | |
| J_curr = J[t_idx] # Moment of inertia | |
| kf_curr = kf[t_idx] # Friction coefficient | |
| Cd_curr = Cd[t_idx] # Drag coefficient | |
| Sf_curr = Sf # Fixed scaling factor | |
| # ======================================== | |
| # STEP 6.3: Differential Drive Kinematics | |
| # ======================================== | |
| # Convert wheel angular velocities to linear and angular velocities | |
| # This is the core of differential drive physics | |
| L = 2 * b_curr # Track width (distance between wheels) | |
| # Motor efficiency (accounts for slip, losses, etc.) | |
| motor_eff = 0.8 | |
| # Apply learned scaling factor to motor commands | |
| omega_r_actual = omega_r_curr * motor_eff * Sf_curr | |
| omega_l_actual = 0.5 * omega_r_actual # Test: omega_l = 0.5 * omega_r | |
| # Differential drive kinematics | |
| v_linear = r_curr * (omega_r_actual + omega_l_actual) / 2.0 # Linear velocity | |
| v_angular = r_curr * (omega_r_actual - omega_l_actual) / L # Angular velocity | |
| # Convert to world frame velocities | |
| vx_cmd = v_linear * torch.cos(psi) # X velocity command | |
| vy_cmd = v_linear * torch.sin(psi) # Y velocity command | |
| wz_cmd = v_angular # Angular velocity command | |
| # ======================================== | |
| # STEP 6.4: Friction and Drag Forces | |
| # ======================================== | |
| # Calculate friction and drag forces that oppose motion | |
| # Current velocity magnitude for friction calculation | |
| vmag = torch.sqrt(vx**2 + vy**2 + eps) | |
| # Friction forces (proportional to normal force and velocity direction) | |
| F_fx = -kf_curr * m_curr * g * vx / vmag # X friction force | |
| F_fy = -kf_curr * m_curr * g * vy / vmag # Y friction force | |
| T_f = -kf_curr * J_curr * wz / (torch.abs(wz) + eps) # Friction torque | |
| # Drag forces (proportional to velocity squared) | |
| F_dx = -Cd_curr * 0.1 * vx**2 * torch.sign(vx) # X drag force | |
| F_dy = -Cd_curr * 0.1 * vy**2 * torch.sign(vy) # Y drag force | |
| T_d = -Cd_curr * 0.1 * J_curr * wz**2 * torch.sign(wz) # Drag torque | |
| # Total forces and torques | |
| Fx = F_fx + F_dx # Total X force | |
| Fy = F_fy + F_dy # Total Y force | |
| Tz = T_f + T_d # Total Z torque | |
| # Accelerations from forces | |
| ax = Fx / m_curr # X acceleration | |
| ay = Fy / m_curr # Y acceleration | |
| alpha = Tz / J_curr # Angular acceleration | |
| # ======================================== | |
| # STEP 6.5: Update Velocities and Positions | |
| # ======================================== | |
| # Apply wheel-commanded velocities plus friction/drag effects | |
| # This combines the commanded motion with the resistance forces | |
| vx = vx_cmd + ax * tau # Update X velocity | |
| vy = vy_cmd + ay * tau # Update Y velocity | |
| wz = wz_cmd + alpha * tau # Update angular velocity | |
| # Update positions using current velocities | |
| x_pos = x_pos + vx * tau # Update X position | |
| y_pos = y_pos + vy * tau # Update Y position | |
| psi = psi + wz * tau # Update heading angle | |
| # Normalize heading angle to [-π, π] | |
| psi = torch.atan2(torch.sin(psi), torch.cos(psi)) | |
| # Store predicted states | |
| predicted_x[i] = x_pos | |
| predicted_y[i] = y_pos | |
| predicted_psi[i] = psi | |
| predicted_vx[i] = vx | |
| predicted_vy[i] = vy | |
| predicted_wz[i] = wz | |
| # ======================================== | |
| # STEP 7: Calculate Physics-Based Loss | |
| # ======================================== | |
| # The loss function compares the simulated trajectory with the actual trajectory | |
| # This is what drives the parameter estimation - the neural network learns | |
| # parameters that make the simulation match the real rover behavior | |
| # Calculate MSE loss for entire trajectory with aggressive weighting | |
| # Dramatically increased position weights to force accurate trajectory following | |
| mse_loss = 0.0 | |
| mse_loss += 1000.0 * torch.mean((predicted_x - actual_x) ** 2) # Position X (dramatically increased) | |
| mse_loss += 1000.0 * torch.mean((predicted_y - actual_y) ** 2) # Position Y (dramatically increased) | |
| mse_loss += 50.0 * torch.mean((predicted_psi - actual_psi) ** 2) # Heading angle (increased) | |
| mse_loss += 5.0 * torch.mean((predicted_vx - actual_vx) ** 2) # Velocity X (increased) | |
| mse_loss += 5.0 * torch.mean((predicted_vy - actual_vy) ** 2) # Velocity Y (increased) | |
| mse_loss += 10.0 * torch.mean((predicted_wz - actual_wz) ** 2) # Angular velocity (increased) | |
| # ======================================== | |
| # STEP 8: Parameter Constraint Penalty | |
| # ======================================== | |
| # Add penalties to ensure learned parameters are physically reasonable | |
| # This prevents the network from learning unrealistic values | |
| param_penalty = 0.0 | |
| # Improved parameter constraints based on realistic rover values | |
| # Arm length constraints (must be positive and reasonable for rover) | |
| param_penalty += 10.0 * torch.mean(torch.relu(-a)) # a > 0 (increased penalty) | |
| param_penalty += 10.0 * torch.mean(torch.relu(-b)) # b > 0 (increased penalty) | |
| param_penalty += 10.0 * torch.mean(torch.relu(-r)) # r > 0 (increased penalty) | |
| param_penalty += 5.0 * torch.mean(torch.relu(a - 0.5)) # a < 0.5m (relaxed for rover) | |
| param_penalty += 5.0 * torch.mean(torch.relu(b - 0.4)) # b < 0.4m (relaxed for rover) | |
| param_penalty += 5.0 * torch.mean(torch.relu(r - 0.3)) # r < 0.3m (relaxed for rover) | |
| # Mass and inertia constraints (must be positive and reasonable) | |
| param_penalty += 10.0 * torch.mean(torch.relu(-m)) # m > 0 (increased penalty) | |
| param_penalty += 10.0 * torch.mean(torch.relu(-J)) # J > 0 (increased penalty) | |
| param_penalty += 2.0 * torch.mean(torch.relu(m - 100.0)) # m < 100kg (relaxed for rover) | |
| param_penalty += 2.0 * torch.mean(torch.relu(J - 5.0)) # J < 5.0 kg⋅m² (relaxed for rover) | |
| # Physical parameter constraints (more realistic for rover) | |
| param_penalty += 10.0 * torch.mean(torch.relu(-kf)) # kf > 0 (increased penalty) | |
| param_penalty += 10.0 * torch.mean(torch.relu(-CM)) # CM > 0 (increased penalty) | |
| param_penalty += 10.0 * torch.mean(torch.relu(-Cd)) # Cd > 0 (increased penalty) | |
| param_penalty += 2.0 * torch.mean(torch.relu(kf - 2.0)) # kf < 2.0 (relaxed for rover) | |
| param_penalty += 2.0 * torch.mean(torch.relu(CM - 0.5)) # CM < 0.5m (relaxed for rover) | |
| param_penalty += 2.0 * torch.mean(torch.relu(Cd - 1.0)) # Cd < 1.0 (relaxed for rover) | |
| # Calculate RMSE for reporting | |
| rmse_loss = torch.sqrt(mse_loss) | |
| # HONEST LOSS: No artificial weighting that reduces loss values | |
| # Total loss combines physics simulation error with parameter constraints | |
| # Reduced parameter penalty to allow more flexible parameter learning | |
| total_loss = mse_loss + 0.001 * param_penalty | |
| # Store predicted trajectory and parameters for debugging | |
| self.predicted_x = predicted_x | |
| self.predicted_y = predicted_y | |
| self.predicted_psi = predicted_psi | |
| self.a = a | |
| self.b = b | |
| self.r = r | |
| self.m = m | |
| self.J = J | |
| self.kf = kf | |
| self.CM = CM | |
| self.Cd = Cd | |
| self.rmse = rmse_loss | |
| return total_loss | |
| def cut_in_sequences(x, y, seq_len, inc=1): | |
| """ | |
| Slice a long 1D/2D series into overlapping windows for sequence-based learning. | |
| This function creates sequences from the input data for the LTC model. | |
| For rover data: input shape (N, 100) -> output shape (seq_len, num_sequences, 100) | |
| Args: | |
| x: Input data array (e.g., x-position trajectory) | |
| y: Target data array (e.g., x-position trajectory) | |
| seq_len: Length of each sequence (e.g., 16 timesteps) | |
| inc: Increment step for creating overlapping sequences | |
| Returns: | |
| sequences_x: Input sequences with shape (seq_len, num_sequences, features) | |
| sequences_y: Target sequences with shape (seq_len, num_sequences, features) | |
| """ | |
| sequences_x, sequences_y = [], [] | |
| for s in range(0, x.shape[0] - seq_len, inc): | |
| start, end = s, s + seq_len | |
| sequences_x.append(x[start:end]) | |
| sequences_y.append(y[start:end]) | |
| return np.stack(sequences_x, axis=1), np.stack(sequences_y, axis=1) | |
| class HarData: | |
| """ | |
| Data handler for rover trajectory data. | |
| This class loads and processes the rover trajectory data from the video and audio | |
| processing steps, creating sequences suitable for the LTC neural network. | |
| """ | |
| def __init__(self, seq_len=16): | |
| print(f"Loading rover trajectory data...") | |
| # Load trajectory data from data directory | |
| data_dir = "data" | |
| # Load position data (x, y, z coordinates) | |
| x_data = np.loadtxt(os.path.join(data_dir, "xData.txt")) | |
| y_data = np.loadtxt(os.path.join(data_dir, "yData.txt")) | |
| z_data = np.loadtxt(os.path.join(data_dir, "zData.txt")) | |
| # Load motor data (omega_r, omega_l) | |
| omega_r_data = np.loadtxt(os.path.join(data_dir, "omega_r.txt")) | |
| omega_l_data = np.loadtxt(os.path.join(data_dir, "omega_l.txt")) | |
| # Get Nloop from data | |
| global Nloop | |
| Nloop = x_data.shape[1] # Use actual data size (100) | |
| print(f"Nloop {Nloop}") | |
| # Use first column for trajectory (time series) | |
| # Apply coordinate scaling: convert pixels to meters | |
| pixel_to_meter = 0.005818 # 1 pixel = 5.818 mm (realistic rover scale) | |
| x_traj = x_data[:, 0] * pixel_to_meter | |
| y_traj = y_data[:, 0] * pixel_to_meter | |
| z_traj = z_data[:, 0] * pixel_to_meter | |
| # Calculate velocities and heading from position data | |
| dt = 0.01 # Time step | |
| vx = np.gradient(x_traj, dt) | |
| vy = np.gradient(y_traj, dt) | |
| psi = np.arctan2(vy, vx) # Heading angle | |
| wz = np.gradient(psi, dt) # Angular velocity | |
| # Create state matrix [x, y, psi, vx, vy, wz] | |
| states = np.column_stack([x_traj, y_traj, psi, vx, vy, wz]) | |
| # Create motor input matrix [omega_r, omega_l, omega_r, omega_l] | |
| omega_r_traj = omega_r_data[:, 0] | |
| omega_l_traj = omega_l_data[:, 0] | |
| motor_inputs = np.column_stack([omega_r_traj, omega_l_traj, omega_r_traj, omega_l_traj]) | |
| # Split data into train/test (80/20) | |
| rows = states.shape[0] | |
| split_idx = max(1, int(0.8 * rows)) | |
| train_states = states[:split_idx] | |
| test_states = states[split_idx:] | |
| train_motors = motor_inputs[:split_idx] | |
| test_motors = motor_inputs[split_idx:] | |
| # Create sequences for training | |
| train_x, train_y = cut_in_sequences(train_states, train_states, seq_len) | |
| train_motor1, train_motor2 = cut_in_sequences(train_motors[:, 0:1], train_motors[:, 1:2], seq_len) | |
| train_motor3, train_motor4 = cut_in_sequences(train_motors[:, 2:3], train_motors[:, 3:4], seq_len) | |
| # Create sequences for testing | |
| test_x, test_y = cut_in_sequences(test_states, test_states, seq_len, inc=8) | |
| test_motor1, test_motor2 = cut_in_sequences(test_motors[:, 0:1], test_motors[:, 1:2], seq_len, inc=8) | |
| test_motor3, test_motor4 = cut_in_sequences(test_motors[:, 2:3], test_motors[:, 3:4], seq_len, inc=8) | |
| # Convert to PyTorch tensors | |
| self.train_x = torch.tensor(train_x, dtype=torch.float32) | |
| self.train_y = torch.tensor(train_y, dtype=torch.float32) | |
| self.train_motor1 = torch.tensor(train_motor1, dtype=torch.float32) | |
| self.train_motor2 = torch.tensor(train_motor2, dtype=torch.float32) | |
| self.train_motor3 = torch.tensor(train_motor3, dtype=torch.float32) | |
| self.train_motor4 = torch.tensor(train_motor4, dtype=torch.float32) | |
| self.test_x = torch.tensor(test_x, dtype=torch.float32) | |
| self.test_y = torch.tensor(test_y, dtype=torch.float32) | |
| self.test_motor1 = torch.tensor(test_motor1, dtype=torch.float32) | |
| self.test_motor2 = torch.tensor(test_motor2, dtype=torch.float32) | |
| self.test_motor3 = torch.tensor(test_motor3, dtype=torch.float32) | |
| self.test_motor4 = torch.tensor(test_motor4, dtype=torch.float32) | |
| print(f"Training sequences: {self.train_x.shape[1]}") | |
| print(f"Test sequences: {self.test_x.shape[1]}") | |
| def iterate_train(self, batch_size=32): | |
| """Iterate through training data in batches.""" | |
| total_seqs = self.train_x.shape[1] | |
| permutation = torch.randperm(total_seqs) | |
| total_batches = total_seqs // batch_size | |
| for i in range(total_batches): | |
| start = i * batch_size | |
| end = start + batch_size | |
| indices = permutation[start:end] | |
| batch_x = self.train_x[:, indices] | |
| batch_y = self.train_y[:, indices] | |
| batch_motor1 = self.train_motor1[:, indices] | |
| batch_motor2 = self.train_motor2[:, indices] | |
| batch_motor3 = self.train_motor3[:, indices] | |
| batch_motor4 = self.train_motor4[:, indices] | |
| yield (batch_x, batch_y, batch_motor1, batch_motor2, batch_motor3, batch_motor4) | |
| class HarModel(nn.Module): | |
| """ | |
| Neural network model for rover parameter estimation. | |
| This class implements the LTC (Liquid Time-Constant) neural network that learns | |
| to predict rover physical parameters from trajectory data. The model takes | |
| sequences of rover trajectory data as input and outputs 9 physical parameters. | |
| Architecture: | |
| - Input: [T, B, 6] where T=timesteps, B=batch_size, 6=state features (x,y,psi,vx,vy,wz) | |
| - Output: [T, B, 9] where 9 is the number of rover parameters | |
| - Uses LTC for sequence-to-sequence learning | |
| """ | |
| def __init__(self, model_type="ltc", model_size=64, learning_rate=0.001): | |
| """ | |
| Initialize the neural network model. | |
| Args: | |
| model_type: Type of model ("ltc", "lstm", etc.) | |
| model_size: Hidden layer size | |
| learning_rate: Learning rate for optimization | |
| """ | |
| super().__init__() | |
| self.model_type = model_type | |
| self.model_size = model_size | |
| # Input size is the number of features per timestep (6 for rover state) | |
| input_size = 6 # [x, y, psi, vx, vy, wz] | |
| print("Beginning rover parameter estimation model...") | |
| if model_type == "lstm": | |
| self.rnn = nn.LSTM(input_size, model_size, batch_first=False) | |
| elif model_type.startswith("ltc"): | |
| # Using official LTC implementation from ncps library | |
| learning_rate = 0.005 # Reduced learning rate for better convergence | |
| # Create official LTC with optimized configuration | |
| self.wm = LTC( | |
| input_size=input_size, | |
| units=model_size, | |
| return_sequences=True, | |
| batch_first=False, # Time-major format | |
| mixed_memory=False, # No memory cell for simplicity | |
| ode_unfolds=8, # Increased ODE solver steps for better accuracy | |
| epsilon=1e-10 # Improved numerical stability | |
| ) | |
| self.rnn = self.wm | |
| elif model_type == "ctgru": | |
| self.rnn = nn.GRU(input_size, model_size, batch_first=False) | |
| else: | |
| self.rnn = nn.RNN(input_size, model_size, batch_first=False) | |
| # Output layer: 8 rover parameters | |
| self.dense = nn.Linear(model_size, 9) | |
| self.sigmoid = nn.Sigmoid() | |
| # Improved AdamW optimizer with better settings for parameter estimation | |
| self.optimizer = optim.AdamW(self.parameters(), lr=learning_rate, | |
| weight_decay=1e-4, betas=(0.9, 0.999), eps=1e-8) | |
| self.to(device) | |
| # Improved learning rate scheduler for better convergence | |
| self.scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts( | |
| self.optimizer, T_0=10, T_mult=2, eta_min=1e-6 | |
| ) | |
| def forward(self, x): | |
| """ | |
| Forward pass through the neural network. | |
| Args: | |
| x: Input trajectory data [T, B, 6] | |
| Returns: | |
| y: Predicted parameters [T, B, 8] | |
| """ | |
| if self.model_type.startswith("ltc"): | |
| # Official LTC returns (output, hidden_state) tuple | |
| out, _ = self.rnn(x) # [T,B,H] | |
| else: | |
| # Other RNNs return (output, hidden_state) tuple | |
| out, _ = self.rnn(x) # [T,B,H] | |
| T, B, H = out.shape | |
| y = self.sigmoid(self.dense(out.reshape(T*B, H))).reshape(T, B, 9) | |
| return y | |
| def compute_loss(self, y_pred, target_y, uMotor1, uMotor2, uMotor3, uMotor4): | |
| """Build the loss object and call .forward().""" | |
| loss_fn = Custom_CE_Loss(target_y, y_pred, uMotor1, uMotor2, uMotor3, uMotor4) | |
| return loss_fn.forward() | |
| class ThetaSimulator: | |
| """ | |
| Simulator class for running rover simulations with estimated parameters. | |
| This class takes the learned parameters from EMMA and runs a complete | |
| differential drive simulation to validate the parameter estimation. | |
| """ | |
| def __init__(self, dt=0.01): | |
| """ | |
| Initialize the simulator. | |
| Args: | |
| dt: Time step for simulation (seconds) | |
| """ | |
| self.dt = dt | |
| def simulate_trajectory(self, initial_state, motor_commands, parameters, duration=10.0): | |
| """ | |
| Simulate differential drive rover dynamics with learned parameters. | |
| Args: | |
| initial_state: Initial rover state [x, y, psi, vx, vy, wz] | |
| motor_commands: Motor input commands [omega_r, omega_l] | |
| parameters: Learned parameters [a, b, r, m, J, kf, CM, Cd] | |
| duration: Simulation duration (seconds) | |
| Returns: | |
| trajectory: Simulated trajectory [n_steps, 6] | |
| """ | |
| a, b, r, m, J, kf, CM, Cd, Sf = parameters | |
| # Convert tensors to numpy if needed | |
| if isinstance(initial_state, torch.Tensor): | |
| initial_state = initial_state.detach().cpu().numpy() | |
| if isinstance(motor_commands, torch.Tensor): | |
| motor_commands = motor_commands.detach().cpu().numpy() | |
| if isinstance(parameters, torch.Tensor): | |
| parameters = parameters.detach().cpu().numpy() | |
| n_steps = int(duration / self.dt) | |
| trajectory = np.zeros((n_steps, 6)) | |
| trajectory[0] = initial_state | |
| states = initial_state.copy() | |
| for t in range(1, n_steps): | |
| # Get motor commands for current timestep | |
| idx = min(t-1, len(motor_commands)-1) | |
| omega_r, omega_l = motor_commands[idx][0], motor_commands[idx][1] | |
| # Current state | |
| x, y, psi, vx, vy, wz = states | |
| # Differential drive kinematics | |
| L = 2 * b # Track width | |
| v_linear = r * (omega_r + omega_l) / 2.0 # Linear velocity | |
| v_angular = r * (omega_r - omega_l) / L # Angular velocity | |
| # Convert to world frame velocities | |
| vx_cmd = v_linear * np.cos(psi) | |
| vy_cmd = v_linear * np.sin(psi) | |
| wz_cmd = v_angular | |
| # Friction and drag forces | |
| vmag = np.sqrt(vx**2 + vy**2 + 0.01) | |
| F_fx = -kf * m * 9.81 * vx / vmag | |
| F_fy = -kf * m * 9.81 * vy / vmag | |
| T_f = -kf * J * wz / (abs(wz) + 0.01) | |
| F_dx = -Cd * 0.1 * vx**2 * np.sign(vx) | |
| F_dy = -Cd * 0.1 * vy**2 * np.sign(vy) | |
| T_d = -Cd * 0.1 * J * wz**2 * np.sign(wz) | |
| # Total forces and accelerations | |
| Fx = F_fx + F_dx | |
| Fy = F_fy + F_dy | |
| Tz = T_f + T_d | |
| ax = Fx / m | |
| ay = Fy / m | |
| alpha = Tz / J | |
| # Update velocities and positions | |
| vx = vx_cmd + ax * self.dt | |
| vy = vy_cmd + ay * self.dt | |
| wz = wz_cmd + alpha * self.dt | |
| x += vx * self.dt | |
| y += vy * self.dt | |
| psi += wz * self.dt | |
| # Normalize heading angle | |
| psi = np.arctan2(np.sin(psi), np.cos(psi)) | |
| # Update state | |
| states = np.array([x, y, psi, vx, vy, wz]) | |
| trajectory[t] = states | |
| return trajectory | |
| def create_animated_simulation(self, actual_trajectory, EMMA_trajectory): | |
| # Load the full trajectory data for complete path visualization | |
| try: | |
| # Load complete trajectory data | |
| x_data = np.loadtxt('data/xData.txt')[:, 0] | |
| y_data = np.loadtxt('data/yData.txt')[:, 0] | |
| # Convert to meters | |
| pixel_to_meter = 0.005818 | |
| x_full = x_data * pixel_to_meter | |
| y_full = y_data * pixel_to_meter | |
| # Create full actual trajectory | |
| full_actual_trajectory = np.column_stack([x_full, y_full]) | |
| # Load motor commands for full EMMA simulation | |
| omega_r_data = np.loadtxt('data/omega_r.txt')[:, 0] | |
| omega_l_data = np.loadtxt('data/omega_l.txt')[:, 0] | |
| # Load estimated parameters | |
| import pandas as pd | |
| params_df = pd.read_csv('rover_coefficients.csv') | |
| a_est = params_df[params_df['Parameter'] == 'a']['Value'].iloc[0] | |
| b_est = params_df[params_df['Parameter'] == 'b']['Value'].iloc[0] | |
| r_est = params_df[params_df['Parameter'] == 'r']['Value'].iloc[0] | |
| # Calculate full EMMA trajectory using differential drive kinematics | |
| dt = 1.0/60.0 | |
| time_array = np.arange(len(x_full)) * dt | |
| v_linear = r_est * (omega_r_data + omega_l_data) / 2.0 | |
| v_angular = r_est * (omega_r_data - omega_l_data) / (2 * b_est) * 0.05 # Angular velocity scaling | |
| x_EMMA = np.zeros_like(x_full) | |
| y_EMMA = np.zeros_like(y_full) | |
| psi_EMMA = np.zeros_like(time_array) | |
| x_EMMA[0] = x_full[0] | |
| y_EMMA[0] = y_full[0] | |
| psi_EMMA[0] = 0.0 | |
| for i in range(1, len(x_EMMA)): | |
| psi_EMMA[i] = psi_EMMA[i-1] + v_angular[i] * dt | |
| vx = v_linear[i] * np.cos(psi_EMMA[i]) | |
| vy = v_linear[i] * np.sin(psi_EMMA[i]) | |
| x_EMMA[i] = x_EMMA[i-1] + vx * dt | |
| y_EMMA[i] = y_EMMA[i-1] + vy * dt | |
| full_EMMA_trajectory = np.column_stack([x_EMMA, y_EMMA]) | |
| print(f"Full trajectory lengths: Actual={len(full_actual_trajectory)}, EMMA={len(full_EMMA_trajectory)}") | |
| except Exception as e: | |
| print(f"Warning: Could not load full trajectory data: {e}") | |
| print("Using training sequence data instead...") | |
| full_actual_trajectory = actual_trajectory | |
| full_EMMA_trajectory = EMMA_trajectory | |
| # Create animation with full trajectories | |
| fig, ax = plt.subplots(figsize=(14, 10)) | |
| ax.set_xlim(min(np.min(full_actual_trajectory[:,0]), np.min(full_EMMA_trajectory[:,0]))-0.5, | |
| max(np.max(full_actual_trajectory[:,0]), np.max(full_EMMA_trajectory[:,0]))+0.5) | |
| ax.set_ylim(min(np.min(full_actual_trajectory[:,1]), np.min(full_EMMA_trajectory[:,1]))-0.5, | |
| max(np.max(full_actual_trajectory[:,1]), np.max(full_EMMA_trajectory[:,1]))+0.5) | |
| ax.set_xlabel('X Position (m)', fontsize=14) | |
| ax.set_ylabel('Y Position (m)', fontsize=14) | |
| ax.grid(True, alpha=0.3) | |
| ax.set_aspect('equal') | |
| ax.set_title('Rover EMMA Simulation: Actual vs EMMA Trajectory Comparison', fontsize=16, fontweight='bold') | |
| # Plot complete trajectories as background | |
| ax.plot(full_actual_trajectory[:,0], full_actual_trajectory[:,1], 'b-', | |
| linewidth=2, alpha=0.3, label='Complete Actual Path') | |
| ax.plot(full_EMMA_trajectory[:,0], full_EMMA_trajectory[:,1], 'r--', | |
| linewidth=2, alpha=0.3, label='Complete EMMA Path') | |
| # Animated elements - Both actual and EMMA | |
| actual_line, = ax.plot([], [], 'b-', linewidth=4, label='Actual Rover (Animated)', alpha=0.9) | |
| EMMA_line, = ax.plot([], [], 'r--', linewidth=4, label='EMMA Simulation (Animated)', alpha=0.9) | |
| actual_point, = ax.plot([], [], 'bo', markersize=12, label='Actual Rover Position', markeredgecolor='black', markeredgewidth=2) | |
| EMMA_point, = ax.plot([], [], 'rs', markersize=12, label='EMMA Rover Position', markeredgecolor='black', markeredgewidth=2) | |
| # Start and end markers | |
| ax.plot(full_actual_trajectory[0,0], full_actual_trajectory[0,1], 'go', | |
| markersize=15, label='Start', markeredgecolor='black', markeredgewidth=2) | |
| ax.plot(full_actual_trajectory[-1,0], full_actual_trajectory[-1,1], 'bs', | |
| markersize=15, label='Actual End', markeredgecolor='black', markeredgewidth=2) | |
| ax.plot(full_EMMA_trajectory[-1,0], full_EMMA_trajectory[-1,1], 'rs', | |
| markersize=15, label='EMMA End', markeredgecolor='black', markeredgewidth=2) | |
| ax.legend(loc='upper right', fontsize=10) | |
| # Status text | |
| status_text = ax.text(0.02, 0.98, '', transform=ax.transAxes, fontsize=11, va='top', | |
| bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.8)) | |
| def animate(frame): | |
| # Use every 2nd frame to make animation smoother but not too fast | |
| step = max(1, len(full_actual_trajectory) // 100) # Show ~100 frames max | |
| current_frame = min(frame * step, len(full_actual_trajectory) - 1) | |
| actual_line.set_data(full_actual_trajectory[:current_frame+1,0], | |
| full_actual_trajectory[:current_frame+1,1]) | |
| EMMA_line.set_data(full_EMMA_trajectory[:current_frame+1,0], | |
| full_EMMA_trajectory[:current_frame+1,1]) | |
| actual_point.set_data([full_actual_trajectory[current_frame,0]], | |
| [full_actual_trajectory[current_frame,1]]) | |
| EMMA_point.set_data([full_EMMA_trajectory[current_frame,0]], | |
| [full_EMMA_trajectory[current_frame,1]]) | |
| time_val = current_frame * dt | |
| a = full_actual_trajectory[current_frame] | |
| e = full_EMMA_trajectory[current_frame] | |
| err = np.sqrt((a[0]-e[0])**2 + (a[1]-e[1])**2) | |
| status_text.set_text(f'Time: {time_val:.2f}s\nActual: ({a[0]:.2f}, {a[1]:.2f})\nEMMA: ({e[0]:.2f}, {e[1]:.2f})\nError: {err:.3f}m\nProgress: {current_frame+1}/{len(full_actual_trajectory)}') | |
| return actual_line, EMMA_line, actual_point, EMMA_point, status_text | |
| # Calculate number of frames for smooth animation | |
| total_frames = min(120, len(full_actual_trajectory) // 2) # Max 120 frames, or half the data points | |
| anim = FuncAnimation(fig, animate, frames=total_frames, interval=100, blit=False, repeat=True) | |
| print("Saving animated simulation with complete trajectory...") | |
| anim.save('rover_EMMA_simulation.gif', writer='pillow', fps=10) | |
| print("Animation saved as 'rover_EMMA_simulation.gif'") | |
| return anim | |
| # Main execution block | |
| if __name__ == "__main__": | |
| """ | |
| Main execution entry point for the rover analysis pipeline. | |
| """ | |
| main() |