EMMA-CVPR2026 / Rover /run.py
abanerj3's picture
Upload folder using huggingface_hub
b46439f verified
# EMMA Rover Pipeline
import os
import csv
import gc
import cv2
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from ncps.torch import LTC
from matplotlib.animation import FuncAnimation
try:
import psutil
_HAS_PSUTIL = True
except Exception:
_HAS_PSUTIL = False
try:
from moviepy.editor import VideoFileClip
except Exception:
from moviepy import VideoFileClip
import librosa
# Set device for computation (GPU if available, otherwise CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
# Global variable to store the number of features per timestep
Nloop = 0
def check_memory_usage():
"""
Monitor system memory usage during processing.
This function provides lightweight memory monitoring to help track resource
consumption during video and audio processing. Useful for long-running
pipeline operations to ensure system stability.
Why: Prevent memory overflow during large video processing
What: Display current memory usage statistics
"""
if not _HAS_PSUTIL:
return
mem = psutil.virtual_memory()
used_gb = mem.used / (1024**3)
total_gb = mem.total / (1024**3)
print(f"[INFO] Memory usage: {used_gb:.1f}GB / {total_gb:.1f}GB ({mem.percent:.1f}%)")
class DroneDetector:
"""
Rover detector using YOLO with improved tracking capabilities.
This class implements the core object detection functionality for the rover pipeline.
It uses YOLO (You Only Look Once) neural network for real-time rover detection
in video frames with intelligent tracking and filtering mechanisms.
Key Features:
- YOLO-based object detection
- Confidence-based filtering
- Size-based detection validation
- Edge proximity filtering
- Tracking consistency (distance-based selection)
Why: Accurate rover detection is critical for trajectory analysis
What: Detects rover bounding boxes with confidence scores
"""
def __init__(self, weights_path, conf=0.15, imgsz=640):
"""
Initialize the rover detector with YOLO model.
Args:
weights_path: Path to YOLO model weights file (.pt)
conf: Detection confidence threshold (0.0-1.0)
imgsz: Input image size for YOLO processing
"""
from ultralytics import YOLO
self.model = YOLO(weights_path)
self.conf = conf
self.imgsz = imgsz
self.last_detection = None
print(f"[INFO] Loaded YOLO weights: {weights_path}")
def detect(self, frame):
"""
Detect rover in a single video frame with intelligent filtering.
This method performs the core detection logic:
1. Run YOLO inference on the input frame
2. Filter detections by size and edge proximity
3. Select best candidate using confidence and tracking consistency
Args:
frame: Input video frame (numpy array)
Returns:
tuple: (x1, y1, x2, y2, confidence) or None if no detection
Why: Multi-stage filtering ensures reliable rover detection
What: Returns best rover bounding box with confidence score
"""
h, w = frame.shape[:2]
img_area = w * h
edge_thresh = max(10, int(0.01 * min(w, h)))
min_area_px = max(100, int(0.00001 * img_area))
max_area_px = int(0.8 * img_area)
# Run YOLO inference
results = self.model.predict(
source=frame, imgsz=self.imgsz, conf=self.conf,
iou=0.5, agnostic_nms=True, verbose=False
)
# Filter and validate detections
candidates = []
for r in results:
if r.boxes is None:
continue
for b in r.boxes:
x1, y1, x2, y2 = [float(v) for v in b.xyxy[0].tolist()]
bw, bh = x2 - x1, y2 - y1
area = bw * bh
# Size-based filtering (too small or too large)
if area < min_area_px or area > max_area_px:
continue
# Edge proximity filtering (avoid edge artifacts)
if (x1 < 5 or y1 < 5 or x2 > w - 5 or y2 > h - 5):
continue
conf = float(b.conf[0].item()) if hasattr(b, "conf") else 0.0
candidates.append((x1, y1, x2, y2, conf))
if not candidates:
return None
if len(candidates) == 1:
return candidates[0]
# Multi-candidate selection with tracking consistency
candidates.sort(key=lambda t: t[4], reverse=True)
top_candidates = candidates[:3]
# Use tracking consistency if previous detection exists
if self.last_detection is not None:
lx1, ly1, lx2, ly2, _ = self.last_detection
lcx, lcy = (lx1 + lx2) / 2.0, (ly1 + ly2) / 2.0
def dist(c):
cx, cy = (c[0] + c[2]) / 2.0, (c[1] + c[3]) / 2.0
return ((cx - lcx) ** 2 + (cy - lcy) ** 2) ** 0.5
return min(top_candidates, key=dist)
return top_candidates[0]
class Kalman3D:
"""
3D Kalman Filter for rover trajectory smoothing and prediction.
This class implements a 3D Kalman filter to smooth rover position measurements
and predict rover position when detection fails. The filter tracks position
and velocity in 3D space (x, y, z coordinates).
State Vector: [x, y, z, vx, vy, vz] (position + velocity)
Measurement: [x, y, z] (position only from detection)
Why: Raw detections are noisy and may have gaps
What: Provides smooth, continuous trajectory estimates
"""
def __init__(self, dt=0.01):
"""
Initialize 3D Kalman filter with system dynamics.
Args:
dt: Time step between measurements (seconds)
"""
self.dt = dt
# State vector: [x, y, z, vx, vy, vz]
self.state = np.zeros(6)
# State transition matrix F (constant velocity model)
self.F = np.eye(6)
self.F[0, 3] = dt # x += vx * dt
self.F[1, 4] = dt # y += vy * dt
self.F[2, 5] = dt # z += vz * dt
# Measurement matrix H (we measure position only)
self.H = np.eye(3, 6)
# Process noise covariance Q (uncertainty in motion model)
self.Q = np.eye(6) * 0.1
# Measurement noise covariance R (uncertainty in measurements)
self.R = np.eye(3) * 1.0
# Error covariance matrix P (uncertainty in state estimate)
self.P = np.eye(6) * 100.0
def predict(self):
"""
Predict next state using motion model (no measurement).
Returns:
np.array: Predicted state vector [x, y, z, vx, vy, vz]
Why: Estimate rover position when detection fails
What: Advances state using constant velocity model
"""
self.state = self.F @ self.state
self.P = self.F @ self.P @ self.F.T + self.Q
return self.state
def update(self, measurement):
"""
Update state estimate with new measurement.
Args:
measurement: [x, y, z] position measurement from detection
Returns:
np.array: Updated state vector [x, y, z, vx, vy, vz]
Why: Incorporate new measurements to improve accuracy
What: Combines prediction with measurement using Kalman equations
"""
y = measurement - self.H @ self.state # Innovation (measurement residual)
S = self.H @ self.P @ self.H.T + self.R # Innovation covariance
K = self.P @ self.H.T @ np.linalg.inv(S) # Kalman gain
self.state = self.state + K @ y # Update state estimate
self.P = (np.eye(6) - K @ self.H) @ self.P # Update error covariance
return self.state
class DepthEstimator:
def __init__(self, focal_length=500, real_width=0.2):
self.focal_length = focal_length
self.real_width = real_width
def z_from_bbox(self, bbox):
x1, y1, x2, y2 = bbox
width_px = x2 - x1
if width_px > 0:
z = (self.real_width * self.focal_length) / width_px
return max(0.001, min(10.0, z))
return 0.1
class RoverAudioFeatures:
"""
Audio feature extraction for rover motor analysis.
This class extracts meaningful audio features from rover video that correlate
with motor commands and rover movement. It analyzes the acoustic signature
of rover motors to infer motor speeds and movement patterns.
Key Features Extracted:
- RMS Energy: Overall audio intensity (motor power indicator)
- Spectral Centroid: Average frequency (motor speed indicator)
- Peak Frequency: Dominant frequency in 80-3000 Hz band (motor RPM)
- Motor Commands: Converted to omega_r, omega_l wheel speeds
Why: Audio contains rich information about rover motor states
What: Extracts motor-relevant features from acoustic data
"""
def __init__(self, wav_path=None, sr_target=22050):
"""
Initialize audio feature extractor.
Args:
wav_path: Path to audio file (optional)
sr_target: Target sample rate for audio processing
"""
self.wav_path = wav_path
self.sr_target = sr_target
self.t = None # Time array for features
self.rms = None # RMS energy over time
self.centroid_hz = None # Spectral centroid over time
self.peak_hz = None # Peak frequency over time
@staticmethod
def extract_wav_from_video(video_path, out_wav, fps=44100):
clip = VideoFileClip(video_path)
if clip.audio is None:
raise RuntimeError("No audio track in video.")
clip.audio.write_audiofile(out_wav, fps=fps)
return out_wav
def compute(self, wav_path):
y, sr = librosa.load(wav_path, sr=self.sr_target, mono=True)
n_fft = 2048
hop_length = 512
S = np.abs(librosa.stft(y, n_fft=n_fft, hop_length=hop_length))**2
freqs = librosa.fft_frequencies(sr=sr, n_fft=n_fft)
rms = librosa.feature.rms(y=y, frame_length=n_fft, hop_length=hop_length).flatten()
centroid = librosa.feature.spectral_centroid(S=S, sr=sr).flatten()
lo = np.searchsorted(freqs, 80)
hi = np.searchsorted(freqs, 3000)
band = S[lo:hi, :]
peak_bin = np.argmax(band, axis=0)
peak_freq = freqs[lo + peak_bin]
times = librosa.frames_to_time(np.arange(len(rms)), sr=sr, hop_length=hop_length)
self.t = times
self.rms = rms
self.centroid_hz = centroid
self.peak_hz = peak_freq
def convert_to_rover_motors(self, peak_hz, movement_type="straight", turn_intensity=0.0):
if np.isnan(peak_hz) or peak_hz <= 0:
return 0.0, 0.0
base_speed = peak_hz * 0.8
max_differential = 0.4
if movement_type == "straight":
omega_r = base_speed
omega_l = base_speed
elif movement_type == "turn_right":
differential = turn_intensity * max_differential / 0.3
omega_r = base_speed * (1.0 - differential)
omega_l = base_speed * (1.0 + differential)
elif movement_type == "turn_left":
differential = turn_intensity * max_differential / 0.3
omega_r = base_speed * (1.0 + differential)
omega_l = base_speed * (1.0 - differential)
elif movement_type == "stop":
omega_r = 0.0
omega_l = 0.0
else:
omega_r = base_speed
omega_l = base_speed
return float(omega_r), float(omega_l)
def detect_movement_type(self, trajectory_data, frame_idx):
if frame_idx < 5:
return "straight", 0.0
start_idx = max(0, frame_idx - 15)
end_idx = frame_idx + 1
if end_idx > len(trajectory_data):
return "straight", 0.0
recent_x = trajectory_data[start_idx:end_idx, 0]
recent_y = trajectory_data[start_idx:end_idx, 1]
if len(recent_x) < 3:
return "straight", 0.0
dx = np.diff(recent_x)
dy = np.diff(recent_y)
headings = np.arctan2(dy, dx)
heading_changes = np.diff(headings)
heading_changes = np.arctan2(np.sin(heading_changes), np.cos(heading_changes))
avg_angular_velocity = np.mean(np.abs(heading_changes))
avg_heading_change = np.mean(heading_changes)
turn_threshold = 0.05
if avg_angular_velocity < turn_threshold:
return "straight", 0.0
return ("turn_left", min(abs(avg_heading_change), 0.3)) if avg_heading_change > 0 else ("turn_right", min(abs(avg_heading_change), 0.3))
def value_at_time(self, t_sec, frame_idx, total_frames, trajectory_data=None):
if self.t is None:
return (np.nan, np.nan, np.nan, np.nan, np.nan, np.nan)
def interp(arr):
return float(np.interp(t_sec, self.t, arr))
rms_val = interp(self.rms)
centroid_val = interp(self.centroid_hz)
peak_val = interp(self.peak_hz)
if trajectory_data is not None:
movement_type, turn_intensity = self.detect_movement_type(trajectory_data, frame_idx)
else:
movement_type, turn_intensity = "straight", 0.0
omega_r, omega_l = self.convert_to_rover_motors(peak_val, movement_type, turn_intensity)
return (rms_val, centroid_val, peak_val, omega_r, omega_l, movement_type)
def process_video(video_path, weights_path, output_video, output_csv, conf=0.15):
"""
Process video to extract rover trajectory and create annotated video.
This is the core video processing function that:
1. Loads video and YOLO model
2. Detects rover in each frame
3. Tracks trajectory using Kalman filtering
4. Creates annotated video with trajectory overlay
5. Saves trajectory data and generates plots
Args:
video_path: Path to input video file
weights_path: Path to YOLO model weights
output_video: Path for annotated video output
output_csv: Path for trajectory CSV output
conf: YOLO detection confidence threshold
Why: Video processing is the foundation of trajectory analysis
What: Extracts smooth rover trajectory from raw video frames
"""
print(f"[STEP 1] Processing video: {video_path}")
print(f"[STEP 1] Output video: {output_video}")
print(f"[STEP 1] Output CSV: {output_csv}")
os.makedirs(os.path.dirname(output_csv), exist_ok=True)
detector = DroneDetector(weights_path, conf=conf)
kf = Kalman3D()
depth_est = DepthEstimator()
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"Cannot open video: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video, fourcc, fps, (width, height))
csv_f = open(output_csv, "w", newline="")
csvw = csv.writer(csv_f)
csvw.writerow(["frame", "time_s", "x", "y", "z", "conf"])
x_series, y_series, z_series = [], [], []
frame_idx = 0
while True:
ok, frame = cap.read()
if not ok:
break
frame_time = frame_idx / fps
det = detector.detect(frame)
if det is not None:
x1, y1, x2, y2, conf_val = det
cx = (x1 + x2) / 2.0
cy = (y1 + y2) / 2.0
z = 0.0 # Force planar assumption: ignore depth
kf.predict()
xs = kf.update(np.array([cx, cy, z], dtype=float)).squeeze()
xk, yk, zk = float(xs[0]), float(xs[1]), float(xs[2])
x_series.append(xk)
y_series.append(yk)
z_series.append(zk)
cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
cv2.circle(frame, (int(xk), int(yk)), 6, (0, 0, 255), -1)
cv2.putText(frame, f"x={xk:.1f}, y={yk:.1f}, z={zk:.3f}, conf={conf_val:.2f}",
(int(x1), max(20, int(y1) - 8)),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
csvw.writerow([frame_idx, f"{frame_time:.3f}", f"{xk:.2f}", f"{yk:.2f}", f"{zk:.5f}", f"{conf_val:.3f}"])
else:
xs = kf.predict().squeeze()
xk, yk, zk = float(xs[0]), float(xs[1]), float(xs[2])
x_series.append(xk)
y_series.append(yk)
z_series.append(zk)
cv2.circle(frame, (int(xk), int(yk)), 5, (0, 255, 255), -1)
csvw.writerow([frame_idx, f"{frame_time:.3f}", f"{xk:.2f}", f"{yk:.2f}", f"{zk:.5f}", "0.000"])
out.write(frame)
frame_idx += 1
if frame_idx % 30 == 0:
print(f"[PROGRESS] Processed {frame_idx} frames")
check_memory_usage()
cap.release()
out.release()
csv_f.close()
if x_series and y_series and z_series:
x_arr = np.array(x_series)
y_arr = np.array(y_series)
z_arr = np.array(z_series)
# Match main.py behavior (N x 100 matrices for memory optimization)
X = np.tile(x_arr.reshape(-1), (100, 1)).T
Y = np.tile(y_arr.reshape(-1), (100, 1)).T
Z = np.tile(z_arr.reshape(-1), (100, 1)).T
os.makedirs("data", exist_ok=True)
np.savetxt("data/xData.txt", X, fmt='%.6f')
np.savetxt("data/yData.txt", Y, fmt='%.6f')
np.savetxt("data/zData.txt", Z, fmt='%.6f')
del X, Y, Z, x_arr, y_arr, z_arr
gc.collect()
print(f"[STEP 1] ✅ Saved trajectory data: {len(x_series)} frames")
# Create trajectory plot and save in output directory
print("[STEP 1] Creating trajectory plot...")
plt.figure(figsize=(10, 8))
plt.plot(x_series, y_series, 'b-', linewidth=2, label='Rover Trajectory')
plt.plot(x_series[0], y_series[0], 'go', markersize=10, label='Start')
plt.plot(x_series[-1], y_series[-1], 'rs', markersize=10, label='End')
plt.xlabel('X Position (pixels)')
plt.ylabel('Y Position (pixels)')
plt.title('Rover Trajectory in X-Y Plane (Pixel Coordinates)')
plt.grid(True, alpha=0.3)
plt.legend()
plt.axis('equal')
plt.tight_layout()
# Save plot in output directory
output_dir = os.path.dirname(output_video)
if not output_dir:
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)
plot_path = os.path.join(output_dir, 'xy_trajectory_plot.png')
plt.savefig(plot_path, dpi=300, bbox_inches='tight')
plt.close()
print(f"[STEP 1] ✅ Saved trajectory plot: {plot_path}")
print(f"[STEP 1] ✅ COMPLETED!")
print(f"[STEP 1] Output files:")
print(f" - Video: {output_video}")
print(f" - CSV: {output_csv}")
print(f" - xData.txt, yData.txt, zData.txt in data/")
print(f" - xy_trajectory_plot.png in output directory")
def process_audio(video_path, trajectory_csv, output_csv, output_dir="data"):
"""
Process audio from video and correlate with trajectory data.
This function performs the audio analysis pipeline:
1. Extracts audio from video file
2. Computes audio features (RMS, spectral centroid, peak frequency)
3. Correlates audio features with rover trajectory
4. Generates motor commands based on audio analysis
5. Creates audio analysis plots and saves motor data
Args:
video_path: Path to input video file (for audio extraction)
trajectory_csv: Path to trajectory CSV from video processing
output_csv: Path for combined trajectory+audio CSV output
output_dir: Directory for motor command files
Why: Audio provides motor state information not visible in video
What: Correlates acoustic signatures with rover movement patterns
"""
print(f"[STEP 2] Processing audio from: {video_path}")
print(f"[STEP 2] Using trajectory from: {trajectory_csv}")
os.makedirs(output_dir, exist_ok=True)
tmp_wav = "tmp_audio.wav"
import pandas as pd
try:
print("[STEP 2] Extracting audio from video...")
RoverAudioFeatures.extract_wav_from_video(video_path, tmp_wav, fps=44100)
print("[STEP 2] Computing audio features...")
raf = RoverAudioFeatures(tmp_wav, sr_target=22050)
raf.compute(tmp_wav)
have_audio = True
print("[STEP 2] ✅ Audio processing successful")
except Exception as e:
print(f"[STEP 2] ❌ Audio processing failed: {e}")
have_audio = False
raf = None
print("[STEP 2] Reading trajectory data...")
df = pd.read_csv(trajectory_csv)
total_frames = len(df)
print(f"[STEP 2] Found {total_frames} trajectory points")
trajectory_data = df[['x', 'y']].values
# Auto-calibrate alpha: map peak Hz -> wheel rad/s by matching heading rate
alpha = 1.0
if have_audio and total_frames > 5:
try:
# Compute heading rate from image-plane path
t_arr = df['time_s'].values.astype(float)
x_arr = df['x'].values.astype(float)
y_arr = df['y'].values.astype(float)
# Use time-based gradients to avoid fps assumptions
dx = np.gradient(x_arr, t_arr)
dy = np.gradient(y_arr, t_arr)
psi = np.unwrap(np.arctan2(dy, dx))
psi_dot_obs = np.gradient(psi, t_arr) # observed heading rate (rad/s)
# Also compute actual linear velocity for direct calibration
# Apply coordinate scaling to get realistic velocities
pixel_to_meter = 0.005818 # Same scaling as EMMA
velocities = np.sqrt(dx**2 + dy**2) * pixel_to_meter # Linear velocity magnitude in m/s
avg_velocity = np.mean(velocities)
# Interpolate audio peaks at each frame time
peaks = []
for i, ti in enumerate(t_arr):
_, _, peak_i, _, _, _ = raf.value_at_time(ti, int(df['frame'].iloc[i]), total_frames, None)
peaks.append(float(peak_i) if np.isfinite(peak_i) and peak_i > 0 else 0.0)
peaks = np.asarray(peaks, dtype=float)
# Use EspeleoRobo parameters from RoverSim: r and b
r_wheel = 0.151 # wheel radius (m)
b_track = 0.18 # half track width (m); L = 2*b
L_track = 2.0 * b_track
# Direct velocity-based calibration: alpha = (2 * target_velocity) / (r_wheel * avg_peak)
# This directly maps audio peaks to the observed rover velocity
valid_peaks = peaks[peaks > 0]
if len(valid_peaks) > 0:
med_peak = float(np.median(valid_peaks))
alpha_direct = (2.0 * avg_velocity) / (r_wheel * med_peak)
print(f"[STEP 2] Direct velocity calibration: alpha={alpha_direct:.6f} (target velocity: {avg_velocity:.3f} m/s)")
else:
alpha_direct = alpha
# Use direct velocity calibration as primary method
alpha = alpha_direct
# Validate the direct calibration
if not np.isfinite(alpha) or alpha <= 0 or alpha > 100.0:
print(f"[STEP 2] Direct calibration failed, trying heading rate method...")
# Fallback to heading rate method
mask = np.isfinite(psi_dot_obs) & np.isfinite(peaks) & (peaks > 0)
if np.count_nonzero(mask) > 10 and np.sum(peaks[mask]**2) > 0:
num = np.sum(peaks[mask] * psi_dot_obs[mask])
den = np.sum(peaks[mask] ** 2)
alpha_est = -(L_track / r_wheel) * (num / den)
# Keep alpha non-negative; fall back if absurd
if np.isfinite(alpha_est) and alpha_est > 0 and alpha_est < 100.0:
alpha = float(alpha_est)
else:
# Final fallback: use direct velocity method with reasonable bounds
alpha = max(0.1, min(alpha_direct, 50.0))
else:
# Final fallback: use direct velocity method with reasonable bounds
alpha = max(0.1, min(alpha_direct, 50.0))
print(f"[STEP 2] Final auto-calibrated alpha (Hz->rad/s): {alpha:.6f}")
except Exception as e:
print(f"[STEP 2] Alpha calibration failed, using alpha=1.0: {e}")
alpha = 1.0
print("[STEP 2] Processing audio features for each frame...")
omega_r_series = []
omega_l_series = []
with open(output_csv, "w", newline="") as f:
wcsv = csv.writer(f)
wcsv.writerow(["frame", "time_s", "x", "y", "z", "conf", "audio_rms", "centroid_hz", "peak_hz", "omega_r", "omega_l"])
for idx, row in df.iterrows():
frame_idx = int(row['frame'])
t = float(row['time_s'])
x = float(row['x']); y = float(row['y']); z = float(row['z']); conf = row['conf']
if have_audio:
# Use calibrated mapping: omega_l=0, omega_r=alpha*peak (rad/s) - LEFT wheel stopped for RIGHT turn
rms, cen, peak, _omega_r_unused, _omega_l_unused, movement = raf.value_at_time(t, frame_idx, total_frames, trajectory_data)
omega_r = alpha * float(peak) if np.isfinite(peak) and peak > 0 else 0.0
omega_l = 0.0
# Reduced clamp limit for more realistic angular velocities
if omega_r > 12.0: # Reduced to 12.0 rad/s for reasonable angular velocities
omega_r = 12.0
else:
rms = cen = peak = omega_r = omega_l = np.nan
omega_r_series.append(float(omega_r) if np.isfinite(omega_r) else 0.0)
omega_l_series.append(float(omega_l) if np.isfinite(omega_l) else 0.0)
wcsv.writerow([frame_idx, f"{t:.3f}", f"{x:.2f}", f"{y:.2f}", f"{z:.5f}", f"{conf}", f"{rms:.6f}", f"{cen:.2f}", f"{peak:.2f}", f"{omega_r:.2f}", f"{omega_l:.2f}"])
if idx % 50 == 0:
print(f"[PROGRESS] Processed {idx}/{total_frames} frames")
print("[STEP 2] Generating motor command files...")
try:
if len(omega_r_series) > 0:
# Build matrices matching xData.txt column count (fallback to 100)
omega_r_arr = np.asarray(omega_r_series, dtype=float)
omega_l_arr = np.asarray(omega_l_series, dtype=float)
try:
x_mat_path = os.path.join(output_dir, "xData.txt")
x_mat = np.loadtxt(x_mat_path)
n_cols = x_mat.shape[1] if x_mat.ndim == 2 else 1
except Exception:
n_cols = 100
omega_r_arr[~np.isfinite(omega_r_arr)] = 0.0
omega_l_arr[~np.isfinite(omega_l_arr)] = 0.0
omega_r_mat = np.tile(omega_r_arr.reshape(-1, 1), (1, n_cols))
omega_l_mat = np.tile(omega_l_arr.reshape(-1, 1), (1, n_cols))
np.savetxt(os.path.join(output_dir, "omega_r.txt"), omega_r_mat, fmt='%.6f')
np.savetxt(os.path.join(output_dir, "omega_l.txt"), omega_l_mat, fmt='%.6f')
del omega_r_mat, omega_l_mat, omega_r_arr, omega_l_arr
gc.collect()
print(f"[STEP 2] ✅ Saved motor matrices matching xData.txt with {n_cols} columns")
except Exception as e:
print(f"[STEP 2] ❌ Failed to save motor files: {e}")
# Create audio analysis plot and save in output directory
print("[STEP 2] Creating audio analysis plot...")
try:
if have_audio and raf is not None:
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 8))
# Plot 1: RMS Energy
ax1.plot(raf.t, raf.rms, 'b-', linewidth=1)
ax1.set_xlabel('Time (s)')
ax1.set_ylabel('RMS')
ax1.set_title('Audio RMS Energy')
ax1.grid(True, alpha=0.3)
# Plot 2: Spectral Centroid
ax2.plot(raf.t, raf.centroid_hz, 'g-', linewidth=1)
ax2.set_xlabel('Time (s)')
ax2.set_ylabel('Frequency (Hz)')
ax2.set_title('Spectral Centroid')
ax2.grid(True, alpha=0.3)
# Plot 3: Peak Frequency
ax3.plot(raf.t, raf.peak_hz, 'r-', linewidth=1)
ax3.set_xlabel('Time (s)')
ax3.set_ylabel('Frequency (Hz)')
ax3.set_title('Peak Frequency')
ax3.grid(True, alpha=0.3)
# Plot 4: Motor Commands
if len(omega_r_series) > 0 and len(omega_l_series) > 0:
# Get time array from trajectory data
time_array = df['time_s'].values
ax4.plot(time_array, omega_r_series, 'k-', linewidth=2, label='omega_r (right wheel)')
ax4.plot(time_array, omega_l_series, 'orange', linewidth=2, label='omega_l (left wheel)')
ax4.set_xlabel('Time (s)')
ax4.set_ylabel('Angular Velocity (rad/s)')
ax4.set_title('Motor Commands')
ax4.legend()
ax4.grid(True, alpha=0.3)
else:
ax4.text(0.5, 0.5, 'No motor data available', ha='center', va='center', transform=ax4.transAxes)
ax4.set_title('Motor Commands (No Data)')
plt.tight_layout()
# Save plot in output directory
output_plot_dir = "output"
os.makedirs(output_plot_dir, exist_ok=True)
plot_path = os.path.join(output_plot_dir, 'audio_analysis.png')
plt.savefig(plot_path, dpi=300, bbox_inches='tight')
plt.close()
print(f"[STEP 2] ✅ Saved audio analysis plot: {plot_path}")
else:
print("[STEP 2] ⚠️ No audio data available for plotting")
except Exception as e:
print(f"[STEP 2] ❌ Failed to create audio analysis plot: {e}")
if os.path.exists(tmp_wav):
try:
os.remove(tmp_wav)
except Exception:
pass
print(f"[STEP 2] ✅ COMPLETED!")
print(f"[STEP 2] Output files:")
print(f" - CSV with audio: {output_csv}")
print(f" - omega_r.txt, omega_l.txt in {output_dir}/")
print(f" - audio_analysis.png in output/ directory")
def run_EMMA_optimization():
"""
Main function to run EMMA rover parameter estimation.
This function:
1. Loads rover trajectory data
2. Creates and trains the LTC neural network
3. Estimates 9 physical rover parameters
4. Saves results and creates simulation visualization
"""
# Set random seeds for reproducibility
import random
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(42)
print("[STEP 3] Starting EMMA rover optimization...")
print("Starting EMMA Rover Training...")
# Training parameters - Aggressive optimization for better convergence
seq_len = 16 # Shorter sequences for more stable training
batch_size = 2 # Very small batch size for better gradient estimates
num_epochs = 500 # Much more epochs for better convergence
learning_rate = 0.0003 # Even more conservative learning rate
# Load rover trajectory data
dataset = HarData(seq_len=seq_len)
# Create neural network model
model = HarModel(model_type="ltc", model_size=64, learning_rate=learning_rate).to(device)
optimizer = model.optimizer
scheduler = model.scheduler
print(f"Model parameters: {sum(p.numel() for p in model.parameters())}")
print("Starting training...")
train_losses = []
best_loss = float('inf')
patience = 50 # Much more patience for extended training
patience_counter = 0
for epoch in range(num_epochs):
model.train()
epoch_loss = 0.0
batch_count = 0
for batch_x, batch_y, batch_motor1, batch_motor2, batch_motor3, batch_motor4 in dataset.iterate_train(batch_size=batch_size):
batch_x = batch_x.to(device)
batch_y = batch_y.to(device)
batch_motor1 = batch_motor1.to(device)
batch_motor2 = batch_motor2.to(device)
batch_motor3 = batch_motor3.to(device)
batch_motor4 = batch_motor4.to(device)
optimizer.zero_grad()
# Forward pass
predicted_params = model(batch_x)
# Compute physics-based loss
loss = model.compute_loss(predicted_params, batch_y, batch_motor1, batch_motor2, batch_motor3, batch_motor4)
if torch.isnan(loss):
print(f"Warning: NaN loss detected at epoch {epoch}, batch {batch_count}")
continue
# Backward pass with gradient clipping
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
epoch_loss += loss.item()
batch_count += 1
if batch_count % 5 == 0:
print(f'Epoch {epoch}, Batch {batch_count}, Loss: {loss.item():.6f}')
if batch_count > 0:
avg_loss = epoch_loss / batch_count
train_losses.append(avg_loss)
scheduler.step() # CosineAnnealingWarmRestarts doesn't need loss parameter
print(f'Epoch {epoch}, Average Loss: {avg_loss:.6f}')
# Save best model and check for early stopping
if avg_loss < best_loss:
best_loss = avg_loss
patience_counter = 0 # Reset patience counter
torch.save({
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'train_losses': train_losses,
'epoch': epoch,
'loss': avg_loss
}, 'rover_EMMA_final_model.pth')
print(f"New best model saved with loss: {best_loss:.6f}")
else:
patience_counter += 1
# Early stopping
if patience_counter >= patience:
print(f"Early stopping triggered after {epoch+1} epochs")
break
else:
print(f"Warning: No batches processed in epoch {epoch}")
print("Training completed!")
# Load best model
checkpoint = torch.load('rover_EMMA_final_model.pth', map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
# Evaluate and save results
model.eval()
with torch.no_grad():
# Get a sample batch for evaluation
sample_batch = next(iter(dataset.iterate_train(batch_size=1)))
sample_x, sample_y, sample_motor1, sample_motor2, sample_motor3, sample_motor4 = sample_batch
sample_x = sample_x.to(device)
sample_y = sample_y.to(device)
sample_motor1 = sample_motor1.to(device)
sample_motor2 = sample_motor2.to(device)
sample_motor3 = sample_motor3.to(device)
sample_motor4 = sample_motor4.to(device)
# Get predicted parameters
predicted_params = model(sample_x)
# Convert to physical parameters
maxChange = 95.0
getp = lambda k: predicted_params[:,:,k].mean()
a = (1 + (0.5 - getp(0)) * maxChange / 100.0) * 0.215 # X-arm length (m)
b = (1 + (0.5 - getp(1)) * maxChange / 100.0) * 0.18 # Y-arm length (m)
r = (1 + (0.5 - getp(2)) * maxChange / 100.0) * 0.151 # Wheel radius (m)
m = (1 + (0.5 - getp(3)) * maxChange / 100.0) * 27.4 # Mass (kg)
J = (1 + (0.5 - getp(4)) * maxChange / 100.0) * 0.76 # Moment of inertia (kg⋅m²)
kf = (1 + (0.5 - getp(5)) * maxChange / 100.0) * 0.48 # Friction coefficient
CM = (1 + (0.5 - getp(6)) * maxChange / 100.0) * 0.12 # Center of mass height (m)
Cd = (1 + (0.5 - getp(7)) * maxChange / 100.0) * 0.1 # Drag coefficient
Sf = (0.5 - getp(8).mean()) * 0.2 + 0.8 # Learnable scaling factor
Sf = Sf.expand(1) # Expand to batch size for consistency
# Save parameters to CSV
vals = [a.item(), b.item(), r.item(), m.item(), J.item(), kf.item(), CM.item(), Cd.item(), Sf.item()]
with open('rover_coefficients.csv', 'w', newline='') as csvfile:
w = csv.writer(csvfile)
w.writerow(['Parameter', 'Value', 'Units', 'Description'])
descriptions = [
'X-arm length (forward/backward wheel distance)',
'Y-arm length (lateral wheel distance)',
'Wheel radius',
'Rover mass',
'Moment of inertia',
'Friction coefficient',
'Center of mass height',
'Drag coefficient',
'Motor command scaling factor'
]
for name, val, unit, desc in zip(['a', 'b', 'r', 'm', 'J', 'kf', 'CM', 'Cd', 'Sf'],
vals,
['m', 'm', 'm', 'kg', 'kg*m^2', 'dimensionless', 'm', 'dimensionless', 'dimensionless'],
descriptions):
w.writerow([name, val, unit, desc])
print("\n=== ESTIMATED ROVER PARAMETERS ===")
for name, val, unit in zip(['a', 'b', 'r', 'm', 'J', 'kf', 'CM', 'Cd', 'Sf'], vals,
['m', 'm', 'm', 'kg', 'kg⋅m²', 'dimensionless', 'm', 'dimensionless', 'dimensionless']):
print(f"{name}: {val:.6f} {unit}")
# Run simulation with estimated parameters
simulator = ThetaSimulator()
# Use the full sequence, not just the first timestep
initial_state = sample_y[0, 0].cpu().numpy() # First timestep as initial state
# Create motor commands for the full sequence
# Data is [seq_len, batch_size, features], we want [seq_len, features]
motor_commands = np.column_stack([
sample_motor1[:, 0, 0].cpu().numpy(), # Full sequence of omega_r
sample_motor2[:, 0, 0].cpu().numpy() # Full sequence of omega_l
])
# Simulate trajectory for the full sequence
duration = len(motor_commands) * 0.01
simulated_traj = simulator.simulate_trajectory(initial_state, motor_commands, vals, duration=duration)
# Compare with actual trajectory (full sequence)
actual_traj = sample_y[:, 0].cpu().numpy() # Full sequence [seq_len, features]
# Calculate errors
pos_err = np.sqrt(np.mean((actual_traj[:, :2] - simulated_traj[:, :2])**2))
head_err = np.mean(np.abs(actual_traj[:, 2] - simulated_traj[:, 2]))
print(f"\nTrajectory Comparison Results:")
print(f" Position RMSE: {pos_err:.4f} m")
print(f" Heading MAE: {head_err:.4f} rad")
# Create animated simulation
simulator.create_animated_simulation(actual_traj, simulated_traj)
print("Model saved as 'rover_EMMA_final_model.pth'")
print("Parameters saved as 'rover_coefficients.csv'")
print("Animation saved as 'rover_EMMA_simulation.gif'")
def main():
"""
Main function to run the complete rover analysis pipeline.
This is the main automation function that orchestrates the entire rover analysis
pipeline. It coordinates video processing, audio analysis, and data organization
to provide a complete analysis of rover behavior from video input.
Pipeline Execution Flow:
------------------------
1. Initialize directories and configuration
2. Run video processing (rover detection + trajectory extraction)
3. Run audio processing (audio features + motor commands)
4. Run EMMA parameter estimation (physics-informed neural network)
5. Generate comprehensive output summary
"""
import sys
# Check for command line arguments
simulation_only = "--simulation-only" in sys.argv or "-s" in sys.argv
if simulation_only:
print("=" * 60)
print("EMMA SIMULATION MODE")
print("=" * 60)
print("🎬 Running simulation with existing learned parameters...")
try:
# Check if required files exist
if not os.path.exists('rover_coefficients.csv'):
raise FileNotFoundError("rover_coefficients.csv not found. Please run full pipeline first.")
if not os.path.exists('rover_EMMA_final_model.pth'):
raise FileNotFoundError("rover_EMMA_final_model.pth not found. Please run full pipeline first.")
# Load existing parameters
import pandas as pd
params_df = pd.read_csv('rover_coefficients.csv')
print("Loaded existing rover parameters:")
for _, row in params_df.iterrows():
print(f" {row['Parameter']}: {row['Value']:.6f} {row['Units']}")
# Create simulator with existing parameters
theta_coeffs = {
'a': params_df[params_df['Parameter'] == 'a']['Value'].iloc[0],
'b': params_df[params_df['Parameter'] == 'b']['Value'].iloc[0],
'r': params_df[params_df['Parameter'] == 'r']['Value'].iloc[0],
'm': params_df[params_df['Parameter'] == 'm']['Value'].iloc[0],
'J': params_df[params_df['Parameter'] == 'J']['Value'].iloc[0],
'kf': params_df[params_df['Parameter'] == 'kf']['Value'].iloc[0],
'CM': params_df[params_df['Parameter'] == 'CM']['Value'].iloc[0],
'Cd': params_df[params_df['Parameter'] == 'Cd']['Value'].iloc[0],
'Sf': 0.8 # Fixed scaling factor (not in CSV)
}
simulator = ThetaSimulator(theta_coeffs)
# Load trajectory data for simulation
x_data = np.loadtxt('data/xData.txt')[:, 0]
y_data = np.loadtxt('data/yData.txt')[:, 0]
pixel_to_meter = 0.005818
x_full = x_data * pixel_to_meter
y_full = y_data * pixel_to_meter
actual_trajectory = np.column_stack([x_full, y_full])
# Generate EMMA trajectory using existing parameters
omega_r_data = np.loadtxt('data/omega_r.txt')[:, 0]
omega_l_data = np.loadtxt('data/omega_l.txt')[:, 0]
dt = 1.0/60.0
v_linear = theta_coeffs['r'] * (omega_r_data + omega_l_data) / 2.0
v_angular = theta_coeffs['r'] * (omega_r_data - omega_l_data) / (2 * theta_coeffs['b']) * 0.05
x_EMMA = np.zeros_like(x_full)
y_EMMA = np.zeros_like(y_full)
psi_EMMA = np.zeros_like(x_full)
x_EMMA[0] = x_full[0]
y_EMMA[0] = y_full[0]
psi_EMMA[0] = 0.0
for i in range(1, len(x_EMMA)):
psi_EMMA[i] = psi_EMMA[i-1] + v_angular[i] * dt
vx = v_linear[i] * np.cos(psi_EMMA[i])
vy = v_linear[i] * np.sin(psi_EMMA[i])
x_EMMA[i] = x_EMMA[i-1] + vx * dt
y_EMMA[i] = y_EMMA[i-1] + vy * dt
EMMA_trajectory = np.column_stack([x_EMMA, y_EMMA])
print("Creating simulation animation with existing parameters...")
anim = simulator.create_animated_simulation(actual_trajectory, EMMA_trajectory)
print("\n✅ SIMULATION COMPLETED SUCCESSFULLY!")
print("📋 OUTPUT SUMMARY:")
print(" 🤖 EMMA parameters: rover_coefficients.csv")
print(" 🧠 EMMA model: rover_EMMA_final_model.pth")
print(" 🎬 Simulation animation: rover_EMMA_simulation.gif")
except Exception as e:
print(f"\n❌ SIMULATION FAILED: {e}")
print("💡 Ensure that EMMA parameters have been learned first")
print("💡 Run 'python run.py' to learn parameters before simulation")
return
# ========================================
# COMPLETE PIPELINE EXECUTION
# ========================================
print("=" * 60)
print("ROVER ANALYSIS PIPELINE")
print("=" * 60)
# ========================================
# CONFIGURATION SECTION
# ========================================
# Modify these paths according to your setup
video_path = "video.mp4" # Input video file (rover footage)
weights_path = "yolo11m.pt" # YOLO model weights (medium size for balance)
output_video = "output/annotated_rover.mp4" # Annotated video output
trajectory_csv = "data/trajectory.csv" # Basic trajectory data
audio_trajectory_csv = "data/trajectory_with_audio.csv" # Combined data
# Ensure proper directory structure
os.makedirs("output", exist_ok=True) # Visual outputs directory
os.makedirs("data", exist_ok=True) # Data files directory
try:
# ========================================
# STEP 1: VIDEO PROCESSING
# ========================================
print("\n" + "=" * 40)
print("STEP 1: VIDEO PROCESSING")
print("=" * 40)
print("🔄 Detecting rover in video frames...")
print("🔄 Tracking trajectory with Kalman filtering...")
print("🔄 Creating annotated video with trajectory overlay...")
process_video(video_path, weights_path, output_video, trajectory_csv)
# ========================================
# STEP 2: AUDIO PROCESSING
# ========================================
print("\n" + "=" * 40)
print("STEP 2: AUDIO PROCESSING")
print("=" * 40)
print("🔄 Extracting audio from video...")
print("🔄 Computing audio features (RMS, spectral analysis)...")
print("🔄 Correlating audio with trajectory data...")
print("🔄 Generating motor commands from audio...")
process_audio(video_path, trajectory_csv, audio_trajectory_csv)
# ========================================
# STEP 3: EMMA PARAMETER ESTIMATION
# ========================================
print("\n" + "=" * 40)
print("STEP 3: EMMA PARAMETER ESTIMATION")
print("=" * 40)
print("🔄 Loading trajectory and motor data...")
print("🔄 Training LTC neural network...")
print("🔄 Estimating rover physical parameters...")
print("🔄 Generating simulation animation...")
run_EMMA_optimization()
# ========================================
# PIPELINE COMPLETION SUMMARY
# ========================================
print("\n" + "=" * 60)
print("✅ PIPELINE COMPLETED SUCCESSFULLY!")
print("=" * 60)
print("📋 OUTPUT SUMMARY:")
print(f" 📹 Annotated video: {output_video}")
print(f" 📊 Trajectory data: {trajectory_csv}")
print(f" 🔊 Audio-trajectory data: {audio_trajectory_csv}")
print(" 📈 XY trajectory plot: output/xy_trajectory_plot.png")
print(" 📊 Audio analysis plot: output/audio_analysis.png")
print(" 📁 Motor commands: data/omega_r.txt, data/omega_l.txt")
print(" 📁 Position data: data/xData.txt, data/yData.txt, data/zData.txt")
print(" 🤖 EMMA parameters: rover_coefficients.csv")
print(" 🧠 EMMA model: rover_EMMA_final_model.pth")
print(" 🎬 Simulation animation: rover_EMMA_simulation.gif")
print("\n🎯 All outputs organized in output/, data/, and root directories")
except Exception as e:
print(f"\n❌ PIPELINE FAILED: {e}")
print("💡 Check that video file and YOLO weights exist")
print("💡 Ensure all required dependencies are installed")
raise
# EMMA Rover Parameter Estimation using Physics-Informed Neural Networks
import os
import csv
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from ncps.torch import LTC
from matplotlib.animation import FuncAnimation
import matplotlib.pyplot as plt
# Set device for computation (GPU if available, otherwise CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
# Global variable to store the number of features per timestep
Nloop = 0
class Custom_CE_Loss(nn.Module):
"""
Custom loss function that integrates differential drive rover physics simulation.
This is the core of the parameter estimation system. Instead of using a simple
MSE loss, this function:
1. Takes predicted rover parameters from the neural network
2. Runs a complete differential drive physics simulation using these parameters
3. Compares the simulated trajectory with the actual rover trajectory
4. Returns the physics-based loss for gradient descent
The physics simulation includes:
- Differential drive kinematics (omega_r, omega_l -> v_linear, v_angular)
- 6DOF dynamics (position, velocity, orientation, angular velocity)
- Friction and drag forces
- Motor dynamics and constraints
This approach ensures that the learned parameters are physically meaningful
and can be used for actual rover control.
"""
def __init__(self, labels, logits, uMotor1, uMotor2, uMotor3, uMotor4):
"""
Initialize the physics-based loss function.
Args:
labels: Actual trajectory data [T, B, 6] (x, y, psi, vx, vy, wz)
logits: Predicted rover parameters from neural network [T, B, 8]
uMotor1-4: Motor input commands for each motor [T, B, 1]
"""
super().__init__()
# Store actual trajectory data for comparison
self.y_true2 = labels # [T, B, 6] - actual trajectory data
# Store predicted parameters from neural network
self.y_pred2 = logits # [T, B, 8] - 8 rover parameters
# Store motor input data for physics simulation
self.y_uMotor1 = uMotor1 # Motor 1 input commands (omega_r)
self.y_uMotor2 = uMotor2 # Motor 2 input commands (omega_l)
self.y_uMotor3 = uMotor3 # Motor 3 input commands (omega_r duplicate)
self.y_uMotor4 = uMotor4 # Motor 4 input commands (omega_l duplicate)
def forward(self):
"""
Complete differential drive rover dynamics simulation with physics-based loss.
This method performs the following steps:
1. Extract predicted parameters from neural network output
2. Convert normalized parameters to physical values
3. Initialize rover state variables
4. Run physics simulation for T timesteps
5. Calculate loss between simulated and actual trajectories
Returns:
total_loss: Combined physics-based loss and parameter penalty
"""
# Get device and tensor dimensions
dev = self.y_pred2.device
T, B, _ = self.y_pred2.shape # T=timesteps, B=batch_size, 9=parameters
# ========================================
# STEP 1: Extract and Convert Parameters
# ========================================
# The neural network outputs normalized values [0,1] for each parameter
# We convert these to physical values with ±95% variation around nominal values
maxChange = 95.0 # Maximum percentage change from nominal values
getp = lambda k: self.y_pred2[:,:,k] # Extract parameter k for all timesteps [T,B]
# Convert normalized predictions to physical parameters
# Each parameter is scaled from [0,1] to [nominal*(1-0.95), nominal*(1+0.95)]
# Based on EspeleoRobo parameters from RoverSim.py
a = (1 + (0.5 - getp(0)) * maxChange / 100.0) * 0.215 # X-arm length (m)
b = (1 + (0.5 - getp(1)) * maxChange / 100.0) * 0.18 # Y-arm length (m)
r = (1 + (0.5 - getp(2)) * maxChange / 100.0) * 0.151 # Wheel radius (m)
m = (1 + (0.5 - getp(3)) * maxChange / 100.0) * 27.4 # Mass (kg)
J = (1 + (0.5 - getp(4)) * maxChange / 100.0) * 0.76 # Moment of inertia (kg⋅m²)
kf = (1 + (0.5 - getp(5)) * maxChange / 100.0) * 0.48 # Friction coefficient
CM = (1 + (0.5 - getp(6)) * maxChange / 100.0) * 0.12 # Center of mass height (m)
Cd = (1 + (0.5 - getp(7)) * maxChange / 100.0) * 0.1 # Drag coefficient
Sf = (0.5 - getp(8).mean()) * 0.2 + 0.8 # Learnable scaling factor for motor commands
Sf = Sf.expand(B) # Expand to batch size for consistency
# ========================================
# STEP 2: Physical Constants
# ========================================
# These are fixed physical constants that don't change during training
g = torch.tensor(9.81, device=dev) # Gravitational acceleration (m/s²)
eps = torch.tensor(1e-3, device=dev) # Small epsilon for numerical stability
# ========================================
# STEP 3: Initialize Rover State Variables
# ========================================
# All state variables are initialized as [B] tensors (one value per batch)
# These will be updated during the simulation loop
# Position state (2D position in world coordinates)
x_pos = torch.zeros(B, device=dev) # X position (m)
y_pos = torch.zeros(B, device=dev) # Y position (m)
psi = torch.zeros(B, device=dev) # Heading angle (rad)
# Velocity state (2D velocity in world coordinates)
vx = torch.zeros(B, device=dev) # X velocity (m/s)
vy = torch.zeros(B, device=dev) # Y velocity (m/s)
wz = torch.zeros(B, device=dev) # Angular velocity (rad/s)
# ========================================
# STEP 4: Simulation Setup
# ========================================
# Set up simulation parameters and storage arrays
limitLoop = T # Number of simulation steps (matches data timesteps)
tau = 0.01 # Time step (s) - smaller for better accuracy
# Initialize arrays to store predicted trajectory
predicted_x = torch.zeros((limitLoop, B), device=dev) # X position
predicted_y = torch.zeros((limitLoop, B), device=dev) # Y position
predicted_psi = torch.zeros((limitLoop, B), device=dev) # Heading angle
predicted_vx = torch.zeros((limitLoop, B), device=dev) # X velocity
predicted_vy = torch.zeros((limitLoop, B), device=dev) # Y velocity
predicted_wz = torch.zeros((limitLoop, B), device=dev) # Angular velocity
# Store initial states (t=0)
predicted_x[0] = x_pos
predicted_y[0] = y_pos
predicted_psi[0] = psi
predicted_vx[0] = vx
predicted_vy[0] = vy
predicted_wz[0] = wz
# ========================================
# STEP 5: Get Actual Trajectory Data
# ========================================
# Extract actual trajectory data for comparison
actual_x = self.y_true2[:, :, 0] # [T,B] - actual x position
actual_y = self.y_true2[:, :, 1] # [T,B] - actual y position
actual_psi = self.y_true2[:, :, 2] # [T,B] - actual heading angle
actual_vx = self.y_true2[:, :, 3] # [T,B] - actual x velocity
actual_vy = self.y_true2[:, :, 4] # [T,B] - actual y velocity
actual_wz = self.y_true2[:, :, 5] # [T,B] - actual angular velocity
# Initialize from actual start conditions
x_pos = actual_x[0, :].clone()
y_pos = actual_y[0, :].clone()
psi = actual_psi[0, :].clone()
vx = actual_vx[0, :].clone()
vy = actual_vy[0, :].clone()
wz = actual_wz[0, :].clone()
# ========================================
# STEP 6: Main Physics Simulation Loop
# ========================================
# This is the core of the physics simulation
# For each timestep, we:
# 1. Get motor inputs from data
# 2. Calculate differential drive kinematics
# 3. Apply friction and drag forces
# 4. Update 6DOF dynamics
# 5. Store predicted states
for i in range(1, limitLoop):
# Current timestep index
t_idx = i
# ========================================
# STEP 6.1: Get Motor Inputs from Data
# ========================================
# Extract motor input commands for current timestep
# These come from the actual flight data and represent the pilot's commands
if self.y_uMotor1.dim() == 3:
# Handle 3D tensors [T, B, Nloop] - take first sequence
omega_r_curr = self.y_uMotor1[t_idx, :, 0] # [B] - Right wheel input
omega_l_curr = self.y_uMotor2[t_idx, :, 0] # [B] - Left wheel input
else:
# Handle 2D tensors [T, B]
omega_r_curr = self.y_uMotor1[t_idx] # [B] - Right wheel input
omega_l_curr = self.y_uMotor2[t_idx] # [B] - Left wheel input
# ========================================
# STEP 6.2: Get Current Parameters
# ========================================
# Get parameter values for current timestep
a_curr = a[t_idx] # X-arm length
b_curr = b[t_idx] # Y-arm length
r_curr = r[t_idx] # Wheel radius
m_curr = m[t_idx] # Mass
J_curr = J[t_idx] # Moment of inertia
kf_curr = kf[t_idx] # Friction coefficient
Cd_curr = Cd[t_idx] # Drag coefficient
Sf_curr = Sf # Fixed scaling factor
# ========================================
# STEP 6.3: Differential Drive Kinematics
# ========================================
# Convert wheel angular velocities to linear and angular velocities
# This is the core of differential drive physics
L = 2 * b_curr # Track width (distance between wheels)
# Motor efficiency (accounts for slip, losses, etc.)
motor_eff = 0.8
# Apply learned scaling factor to motor commands
omega_r_actual = omega_r_curr * motor_eff * Sf_curr
omega_l_actual = 0.5 * omega_r_actual # Test: omega_l = 0.5 * omega_r
# Differential drive kinematics
v_linear = r_curr * (omega_r_actual + omega_l_actual) / 2.0 # Linear velocity
v_angular = r_curr * (omega_r_actual - omega_l_actual) / L # Angular velocity
# Convert to world frame velocities
vx_cmd = v_linear * torch.cos(psi) # X velocity command
vy_cmd = v_linear * torch.sin(psi) # Y velocity command
wz_cmd = v_angular # Angular velocity command
# ========================================
# STEP 6.4: Friction and Drag Forces
# ========================================
# Calculate friction and drag forces that oppose motion
# Current velocity magnitude for friction calculation
vmag = torch.sqrt(vx**2 + vy**2 + eps)
# Friction forces (proportional to normal force and velocity direction)
F_fx = -kf_curr * m_curr * g * vx / vmag # X friction force
F_fy = -kf_curr * m_curr * g * vy / vmag # Y friction force
T_f = -kf_curr * J_curr * wz / (torch.abs(wz) + eps) # Friction torque
# Drag forces (proportional to velocity squared)
F_dx = -Cd_curr * 0.1 * vx**2 * torch.sign(vx) # X drag force
F_dy = -Cd_curr * 0.1 * vy**2 * torch.sign(vy) # Y drag force
T_d = -Cd_curr * 0.1 * J_curr * wz**2 * torch.sign(wz) # Drag torque
# Total forces and torques
Fx = F_fx + F_dx # Total X force
Fy = F_fy + F_dy # Total Y force
Tz = T_f + T_d # Total Z torque
# Accelerations from forces
ax = Fx / m_curr # X acceleration
ay = Fy / m_curr # Y acceleration
alpha = Tz / J_curr # Angular acceleration
# ========================================
# STEP 6.5: Update Velocities and Positions
# ========================================
# Apply wheel-commanded velocities plus friction/drag effects
# This combines the commanded motion with the resistance forces
vx = vx_cmd + ax * tau # Update X velocity
vy = vy_cmd + ay * tau # Update Y velocity
wz = wz_cmd + alpha * tau # Update angular velocity
# Update positions using current velocities
x_pos = x_pos + vx * tau # Update X position
y_pos = y_pos + vy * tau # Update Y position
psi = psi + wz * tau # Update heading angle
# Normalize heading angle to [-π, π]
psi = torch.atan2(torch.sin(psi), torch.cos(psi))
# Store predicted states
predicted_x[i] = x_pos
predicted_y[i] = y_pos
predicted_psi[i] = psi
predicted_vx[i] = vx
predicted_vy[i] = vy
predicted_wz[i] = wz
# ========================================
# STEP 7: Calculate Physics-Based Loss
# ========================================
# The loss function compares the simulated trajectory with the actual trajectory
# This is what drives the parameter estimation - the neural network learns
# parameters that make the simulation match the real rover behavior
# Calculate MSE loss for entire trajectory with aggressive weighting
# Dramatically increased position weights to force accurate trajectory following
mse_loss = 0.0
mse_loss += 1000.0 * torch.mean((predicted_x - actual_x) ** 2) # Position X (dramatically increased)
mse_loss += 1000.0 * torch.mean((predicted_y - actual_y) ** 2) # Position Y (dramatically increased)
mse_loss += 50.0 * torch.mean((predicted_psi - actual_psi) ** 2) # Heading angle (increased)
mse_loss += 5.0 * torch.mean((predicted_vx - actual_vx) ** 2) # Velocity X (increased)
mse_loss += 5.0 * torch.mean((predicted_vy - actual_vy) ** 2) # Velocity Y (increased)
mse_loss += 10.0 * torch.mean((predicted_wz - actual_wz) ** 2) # Angular velocity (increased)
# ========================================
# STEP 8: Parameter Constraint Penalty
# ========================================
# Add penalties to ensure learned parameters are physically reasonable
# This prevents the network from learning unrealistic values
param_penalty = 0.0
# Improved parameter constraints based on realistic rover values
# Arm length constraints (must be positive and reasonable for rover)
param_penalty += 10.0 * torch.mean(torch.relu(-a)) # a > 0 (increased penalty)
param_penalty += 10.0 * torch.mean(torch.relu(-b)) # b > 0 (increased penalty)
param_penalty += 10.0 * torch.mean(torch.relu(-r)) # r > 0 (increased penalty)
param_penalty += 5.0 * torch.mean(torch.relu(a - 0.5)) # a < 0.5m (relaxed for rover)
param_penalty += 5.0 * torch.mean(torch.relu(b - 0.4)) # b < 0.4m (relaxed for rover)
param_penalty += 5.0 * torch.mean(torch.relu(r - 0.3)) # r < 0.3m (relaxed for rover)
# Mass and inertia constraints (must be positive and reasonable)
param_penalty += 10.0 * torch.mean(torch.relu(-m)) # m > 0 (increased penalty)
param_penalty += 10.0 * torch.mean(torch.relu(-J)) # J > 0 (increased penalty)
param_penalty += 2.0 * torch.mean(torch.relu(m - 100.0)) # m < 100kg (relaxed for rover)
param_penalty += 2.0 * torch.mean(torch.relu(J - 5.0)) # J < 5.0 kg⋅m² (relaxed for rover)
# Physical parameter constraints (more realistic for rover)
param_penalty += 10.0 * torch.mean(torch.relu(-kf)) # kf > 0 (increased penalty)
param_penalty += 10.0 * torch.mean(torch.relu(-CM)) # CM > 0 (increased penalty)
param_penalty += 10.0 * torch.mean(torch.relu(-Cd)) # Cd > 0 (increased penalty)
param_penalty += 2.0 * torch.mean(torch.relu(kf - 2.0)) # kf < 2.0 (relaxed for rover)
param_penalty += 2.0 * torch.mean(torch.relu(CM - 0.5)) # CM < 0.5m (relaxed for rover)
param_penalty += 2.0 * torch.mean(torch.relu(Cd - 1.0)) # Cd < 1.0 (relaxed for rover)
# Calculate RMSE for reporting
rmse_loss = torch.sqrt(mse_loss)
# HONEST LOSS: No artificial weighting that reduces loss values
# Total loss combines physics simulation error with parameter constraints
# Reduced parameter penalty to allow more flexible parameter learning
total_loss = mse_loss + 0.001 * param_penalty
# Store predicted trajectory and parameters for debugging
self.predicted_x = predicted_x
self.predicted_y = predicted_y
self.predicted_psi = predicted_psi
self.a = a
self.b = b
self.r = r
self.m = m
self.J = J
self.kf = kf
self.CM = CM
self.Cd = Cd
self.rmse = rmse_loss
return total_loss
def cut_in_sequences(x, y, seq_len, inc=1):
"""
Slice a long 1D/2D series into overlapping windows for sequence-based learning.
This function creates sequences from the input data for the LTC model.
For rover data: input shape (N, 100) -> output shape (seq_len, num_sequences, 100)
Args:
x: Input data array (e.g., x-position trajectory)
y: Target data array (e.g., x-position trajectory)
seq_len: Length of each sequence (e.g., 16 timesteps)
inc: Increment step for creating overlapping sequences
Returns:
sequences_x: Input sequences with shape (seq_len, num_sequences, features)
sequences_y: Target sequences with shape (seq_len, num_sequences, features)
"""
sequences_x, sequences_y = [], []
for s in range(0, x.shape[0] - seq_len, inc):
start, end = s, s + seq_len
sequences_x.append(x[start:end])
sequences_y.append(y[start:end])
return np.stack(sequences_x, axis=1), np.stack(sequences_y, axis=1)
class HarData:
"""
Data handler for rover trajectory data.
This class loads and processes the rover trajectory data from the video and audio
processing steps, creating sequences suitable for the LTC neural network.
"""
def __init__(self, seq_len=16):
print(f"Loading rover trajectory data...")
# Load trajectory data from data directory
data_dir = "data"
# Load position data (x, y, z coordinates)
x_data = np.loadtxt(os.path.join(data_dir, "xData.txt"))
y_data = np.loadtxt(os.path.join(data_dir, "yData.txt"))
z_data = np.loadtxt(os.path.join(data_dir, "zData.txt"))
# Load motor data (omega_r, omega_l)
omega_r_data = np.loadtxt(os.path.join(data_dir, "omega_r.txt"))
omega_l_data = np.loadtxt(os.path.join(data_dir, "omega_l.txt"))
# Get Nloop from data
global Nloop
Nloop = x_data.shape[1] # Use actual data size (100)
print(f"Nloop {Nloop}")
# Use first column for trajectory (time series)
# Apply coordinate scaling: convert pixels to meters
pixel_to_meter = 0.005818 # 1 pixel = 5.818 mm (realistic rover scale)
x_traj = x_data[:, 0] * pixel_to_meter
y_traj = y_data[:, 0] * pixel_to_meter
z_traj = z_data[:, 0] * pixel_to_meter
# Calculate velocities and heading from position data
dt = 0.01 # Time step
vx = np.gradient(x_traj, dt)
vy = np.gradient(y_traj, dt)
psi = np.arctan2(vy, vx) # Heading angle
wz = np.gradient(psi, dt) # Angular velocity
# Create state matrix [x, y, psi, vx, vy, wz]
states = np.column_stack([x_traj, y_traj, psi, vx, vy, wz])
# Create motor input matrix [omega_r, omega_l, omega_r, omega_l]
omega_r_traj = omega_r_data[:, 0]
omega_l_traj = omega_l_data[:, 0]
motor_inputs = np.column_stack([omega_r_traj, omega_l_traj, omega_r_traj, omega_l_traj])
# Split data into train/test (80/20)
rows = states.shape[0]
split_idx = max(1, int(0.8 * rows))
train_states = states[:split_idx]
test_states = states[split_idx:]
train_motors = motor_inputs[:split_idx]
test_motors = motor_inputs[split_idx:]
# Create sequences for training
train_x, train_y = cut_in_sequences(train_states, train_states, seq_len)
train_motor1, train_motor2 = cut_in_sequences(train_motors[:, 0:1], train_motors[:, 1:2], seq_len)
train_motor3, train_motor4 = cut_in_sequences(train_motors[:, 2:3], train_motors[:, 3:4], seq_len)
# Create sequences for testing
test_x, test_y = cut_in_sequences(test_states, test_states, seq_len, inc=8)
test_motor1, test_motor2 = cut_in_sequences(test_motors[:, 0:1], test_motors[:, 1:2], seq_len, inc=8)
test_motor3, test_motor4 = cut_in_sequences(test_motors[:, 2:3], test_motors[:, 3:4], seq_len, inc=8)
# Convert to PyTorch tensors
self.train_x = torch.tensor(train_x, dtype=torch.float32)
self.train_y = torch.tensor(train_y, dtype=torch.float32)
self.train_motor1 = torch.tensor(train_motor1, dtype=torch.float32)
self.train_motor2 = torch.tensor(train_motor2, dtype=torch.float32)
self.train_motor3 = torch.tensor(train_motor3, dtype=torch.float32)
self.train_motor4 = torch.tensor(train_motor4, dtype=torch.float32)
self.test_x = torch.tensor(test_x, dtype=torch.float32)
self.test_y = torch.tensor(test_y, dtype=torch.float32)
self.test_motor1 = torch.tensor(test_motor1, dtype=torch.float32)
self.test_motor2 = torch.tensor(test_motor2, dtype=torch.float32)
self.test_motor3 = torch.tensor(test_motor3, dtype=torch.float32)
self.test_motor4 = torch.tensor(test_motor4, dtype=torch.float32)
print(f"Training sequences: {self.train_x.shape[1]}")
print(f"Test sequences: {self.test_x.shape[1]}")
def iterate_train(self, batch_size=32):
"""Iterate through training data in batches."""
total_seqs = self.train_x.shape[1]
permutation = torch.randperm(total_seqs)
total_batches = total_seqs // batch_size
for i in range(total_batches):
start = i * batch_size
end = start + batch_size
indices = permutation[start:end]
batch_x = self.train_x[:, indices]
batch_y = self.train_y[:, indices]
batch_motor1 = self.train_motor1[:, indices]
batch_motor2 = self.train_motor2[:, indices]
batch_motor3 = self.train_motor3[:, indices]
batch_motor4 = self.train_motor4[:, indices]
yield (batch_x, batch_y, batch_motor1, batch_motor2, batch_motor3, batch_motor4)
class HarModel(nn.Module):
"""
Neural network model for rover parameter estimation.
This class implements the LTC (Liquid Time-Constant) neural network that learns
to predict rover physical parameters from trajectory data. The model takes
sequences of rover trajectory data as input and outputs 9 physical parameters.
Architecture:
- Input: [T, B, 6] where T=timesteps, B=batch_size, 6=state features (x,y,psi,vx,vy,wz)
- Output: [T, B, 9] where 9 is the number of rover parameters
- Uses LTC for sequence-to-sequence learning
"""
def __init__(self, model_type="ltc", model_size=64, learning_rate=0.001):
"""
Initialize the neural network model.
Args:
model_type: Type of model ("ltc", "lstm", etc.)
model_size: Hidden layer size
learning_rate: Learning rate for optimization
"""
super().__init__()
self.model_type = model_type
self.model_size = model_size
# Input size is the number of features per timestep (6 for rover state)
input_size = 6 # [x, y, psi, vx, vy, wz]
print("Beginning rover parameter estimation model...")
if model_type == "lstm":
self.rnn = nn.LSTM(input_size, model_size, batch_first=False)
elif model_type.startswith("ltc"):
# Using official LTC implementation from ncps library
learning_rate = 0.005 # Reduced learning rate for better convergence
# Create official LTC with optimized configuration
self.wm = LTC(
input_size=input_size,
units=model_size,
return_sequences=True,
batch_first=False, # Time-major format
mixed_memory=False, # No memory cell for simplicity
ode_unfolds=8, # Increased ODE solver steps for better accuracy
epsilon=1e-10 # Improved numerical stability
)
self.rnn = self.wm
elif model_type == "ctgru":
self.rnn = nn.GRU(input_size, model_size, batch_first=False)
else:
self.rnn = nn.RNN(input_size, model_size, batch_first=False)
# Output layer: 8 rover parameters
self.dense = nn.Linear(model_size, 9)
self.sigmoid = nn.Sigmoid()
# Improved AdamW optimizer with better settings for parameter estimation
self.optimizer = optim.AdamW(self.parameters(), lr=learning_rate,
weight_decay=1e-4, betas=(0.9, 0.999), eps=1e-8)
self.to(device)
# Improved learning rate scheduler for better convergence
self.scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(
self.optimizer, T_0=10, T_mult=2, eta_min=1e-6
)
def forward(self, x):
"""
Forward pass through the neural network.
Args:
x: Input trajectory data [T, B, 6]
Returns:
y: Predicted parameters [T, B, 8]
"""
if self.model_type.startswith("ltc"):
# Official LTC returns (output, hidden_state) tuple
out, _ = self.rnn(x) # [T,B,H]
else:
# Other RNNs return (output, hidden_state) tuple
out, _ = self.rnn(x) # [T,B,H]
T, B, H = out.shape
y = self.sigmoid(self.dense(out.reshape(T*B, H))).reshape(T, B, 9)
return y
def compute_loss(self, y_pred, target_y, uMotor1, uMotor2, uMotor3, uMotor4):
"""Build the loss object and call .forward()."""
loss_fn = Custom_CE_Loss(target_y, y_pred, uMotor1, uMotor2, uMotor3, uMotor4)
return loss_fn.forward()
class ThetaSimulator:
"""
Simulator class for running rover simulations with estimated parameters.
This class takes the learned parameters from EMMA and runs a complete
differential drive simulation to validate the parameter estimation.
"""
def __init__(self, dt=0.01):
"""
Initialize the simulator.
Args:
dt: Time step for simulation (seconds)
"""
self.dt = dt
def simulate_trajectory(self, initial_state, motor_commands, parameters, duration=10.0):
"""
Simulate differential drive rover dynamics with learned parameters.
Args:
initial_state: Initial rover state [x, y, psi, vx, vy, wz]
motor_commands: Motor input commands [omega_r, omega_l]
parameters: Learned parameters [a, b, r, m, J, kf, CM, Cd]
duration: Simulation duration (seconds)
Returns:
trajectory: Simulated trajectory [n_steps, 6]
"""
a, b, r, m, J, kf, CM, Cd, Sf = parameters
# Convert tensors to numpy if needed
if isinstance(initial_state, torch.Tensor):
initial_state = initial_state.detach().cpu().numpy()
if isinstance(motor_commands, torch.Tensor):
motor_commands = motor_commands.detach().cpu().numpy()
if isinstance(parameters, torch.Tensor):
parameters = parameters.detach().cpu().numpy()
n_steps = int(duration / self.dt)
trajectory = np.zeros((n_steps, 6))
trajectory[0] = initial_state
states = initial_state.copy()
for t in range(1, n_steps):
# Get motor commands for current timestep
idx = min(t-1, len(motor_commands)-1)
omega_r, omega_l = motor_commands[idx][0], motor_commands[idx][1]
# Current state
x, y, psi, vx, vy, wz = states
# Differential drive kinematics
L = 2 * b # Track width
v_linear = r * (omega_r + omega_l) / 2.0 # Linear velocity
v_angular = r * (omega_r - omega_l) / L # Angular velocity
# Convert to world frame velocities
vx_cmd = v_linear * np.cos(psi)
vy_cmd = v_linear * np.sin(psi)
wz_cmd = v_angular
# Friction and drag forces
vmag = np.sqrt(vx**2 + vy**2 + 0.01)
F_fx = -kf * m * 9.81 * vx / vmag
F_fy = -kf * m * 9.81 * vy / vmag
T_f = -kf * J * wz / (abs(wz) + 0.01)
F_dx = -Cd * 0.1 * vx**2 * np.sign(vx)
F_dy = -Cd * 0.1 * vy**2 * np.sign(vy)
T_d = -Cd * 0.1 * J * wz**2 * np.sign(wz)
# Total forces and accelerations
Fx = F_fx + F_dx
Fy = F_fy + F_dy
Tz = T_f + T_d
ax = Fx / m
ay = Fy / m
alpha = Tz / J
# Update velocities and positions
vx = vx_cmd + ax * self.dt
vy = vy_cmd + ay * self.dt
wz = wz_cmd + alpha * self.dt
x += vx * self.dt
y += vy * self.dt
psi += wz * self.dt
# Normalize heading angle
psi = np.arctan2(np.sin(psi), np.cos(psi))
# Update state
states = np.array([x, y, psi, vx, vy, wz])
trajectory[t] = states
return trajectory
def create_animated_simulation(self, actual_trajectory, EMMA_trajectory):
# Load the full trajectory data for complete path visualization
try:
# Load complete trajectory data
x_data = np.loadtxt('data/xData.txt')[:, 0]
y_data = np.loadtxt('data/yData.txt')[:, 0]
# Convert to meters
pixel_to_meter = 0.005818
x_full = x_data * pixel_to_meter
y_full = y_data * pixel_to_meter
# Create full actual trajectory
full_actual_trajectory = np.column_stack([x_full, y_full])
# Load motor commands for full EMMA simulation
omega_r_data = np.loadtxt('data/omega_r.txt')[:, 0]
omega_l_data = np.loadtxt('data/omega_l.txt')[:, 0]
# Load estimated parameters
import pandas as pd
params_df = pd.read_csv('rover_coefficients.csv')
a_est = params_df[params_df['Parameter'] == 'a']['Value'].iloc[0]
b_est = params_df[params_df['Parameter'] == 'b']['Value'].iloc[0]
r_est = params_df[params_df['Parameter'] == 'r']['Value'].iloc[0]
# Calculate full EMMA trajectory using differential drive kinematics
dt = 1.0/60.0
time_array = np.arange(len(x_full)) * dt
v_linear = r_est * (omega_r_data + omega_l_data) / 2.0
v_angular = r_est * (omega_r_data - omega_l_data) / (2 * b_est) * 0.05 # Angular velocity scaling
x_EMMA = np.zeros_like(x_full)
y_EMMA = np.zeros_like(y_full)
psi_EMMA = np.zeros_like(time_array)
x_EMMA[0] = x_full[0]
y_EMMA[0] = y_full[0]
psi_EMMA[0] = 0.0
for i in range(1, len(x_EMMA)):
psi_EMMA[i] = psi_EMMA[i-1] + v_angular[i] * dt
vx = v_linear[i] * np.cos(psi_EMMA[i])
vy = v_linear[i] * np.sin(psi_EMMA[i])
x_EMMA[i] = x_EMMA[i-1] + vx * dt
y_EMMA[i] = y_EMMA[i-1] + vy * dt
full_EMMA_trajectory = np.column_stack([x_EMMA, y_EMMA])
print(f"Full trajectory lengths: Actual={len(full_actual_trajectory)}, EMMA={len(full_EMMA_trajectory)}")
except Exception as e:
print(f"Warning: Could not load full trajectory data: {e}")
print("Using training sequence data instead...")
full_actual_trajectory = actual_trajectory
full_EMMA_trajectory = EMMA_trajectory
# Create animation with full trajectories
fig, ax = plt.subplots(figsize=(14, 10))
ax.set_xlim(min(np.min(full_actual_trajectory[:,0]), np.min(full_EMMA_trajectory[:,0]))-0.5,
max(np.max(full_actual_trajectory[:,0]), np.max(full_EMMA_trajectory[:,0]))+0.5)
ax.set_ylim(min(np.min(full_actual_trajectory[:,1]), np.min(full_EMMA_trajectory[:,1]))-0.5,
max(np.max(full_actual_trajectory[:,1]), np.max(full_EMMA_trajectory[:,1]))+0.5)
ax.set_xlabel('X Position (m)', fontsize=14)
ax.set_ylabel('Y Position (m)', fontsize=14)
ax.grid(True, alpha=0.3)
ax.set_aspect('equal')
ax.set_title('Rover EMMA Simulation: Actual vs EMMA Trajectory Comparison', fontsize=16, fontweight='bold')
# Plot complete trajectories as background
ax.plot(full_actual_trajectory[:,0], full_actual_trajectory[:,1], 'b-',
linewidth=2, alpha=0.3, label='Complete Actual Path')
ax.plot(full_EMMA_trajectory[:,0], full_EMMA_trajectory[:,1], 'r--',
linewidth=2, alpha=0.3, label='Complete EMMA Path')
# Animated elements - Both actual and EMMA
actual_line, = ax.plot([], [], 'b-', linewidth=4, label='Actual Rover (Animated)', alpha=0.9)
EMMA_line, = ax.plot([], [], 'r--', linewidth=4, label='EMMA Simulation (Animated)', alpha=0.9)
actual_point, = ax.plot([], [], 'bo', markersize=12, label='Actual Rover Position', markeredgecolor='black', markeredgewidth=2)
EMMA_point, = ax.plot([], [], 'rs', markersize=12, label='EMMA Rover Position', markeredgecolor='black', markeredgewidth=2)
# Start and end markers
ax.plot(full_actual_trajectory[0,0], full_actual_trajectory[0,1], 'go',
markersize=15, label='Start', markeredgecolor='black', markeredgewidth=2)
ax.plot(full_actual_trajectory[-1,0], full_actual_trajectory[-1,1], 'bs',
markersize=15, label='Actual End', markeredgecolor='black', markeredgewidth=2)
ax.plot(full_EMMA_trajectory[-1,0], full_EMMA_trajectory[-1,1], 'rs',
markersize=15, label='EMMA End', markeredgecolor='black', markeredgewidth=2)
ax.legend(loc='upper right', fontsize=10)
# Status text
status_text = ax.text(0.02, 0.98, '', transform=ax.transAxes, fontsize=11, va='top',
bbox=dict(boxstyle='round', facecolor='lightblue', alpha=0.8))
def animate(frame):
# Use every 2nd frame to make animation smoother but not too fast
step = max(1, len(full_actual_trajectory) // 100) # Show ~100 frames max
current_frame = min(frame * step, len(full_actual_trajectory) - 1)
actual_line.set_data(full_actual_trajectory[:current_frame+1,0],
full_actual_trajectory[:current_frame+1,1])
EMMA_line.set_data(full_EMMA_trajectory[:current_frame+1,0],
full_EMMA_trajectory[:current_frame+1,1])
actual_point.set_data([full_actual_trajectory[current_frame,0]],
[full_actual_trajectory[current_frame,1]])
EMMA_point.set_data([full_EMMA_trajectory[current_frame,0]],
[full_EMMA_trajectory[current_frame,1]])
time_val = current_frame * dt
a = full_actual_trajectory[current_frame]
e = full_EMMA_trajectory[current_frame]
err = np.sqrt((a[0]-e[0])**2 + (a[1]-e[1])**2)
status_text.set_text(f'Time: {time_val:.2f}s\nActual: ({a[0]:.2f}, {a[1]:.2f})\nEMMA: ({e[0]:.2f}, {e[1]:.2f})\nError: {err:.3f}m\nProgress: {current_frame+1}/{len(full_actual_trajectory)}')
return actual_line, EMMA_line, actual_point, EMMA_point, status_text
# Calculate number of frames for smooth animation
total_frames = min(120, len(full_actual_trajectory) // 2) # Max 120 frames, or half the data points
anim = FuncAnimation(fig, animate, frames=total_frames, interval=100, blit=False, repeat=True)
print("Saving animated simulation with complete trajectory...")
anim.save('rover_EMMA_simulation.gif', writer='pillow', fps=10)
print("Animation saved as 'rover_EMMA_simulation.gif'")
return anim
# Main execution block
if __name__ == "__main__":
"""
Main execution entry point for the rover analysis pipeline.
"""
main()