Georg
Enhance mujoco_server.py and README.md for improved teleoperation and environment management
9078db7 | """Quadruped-PyMPC inspired Controller for Spot robot. | |
| This controller uses components from Quadruped-PyMPC for: | |
| - Periodic gait generation | |
| - Swing trajectory planning | |
| But outputs joint position targets (not torques) to work with SpotEnv's | |
| position-controlled actuators. | |
| Requires: quadruped_pympc, jax, gym_quadruped | |
| """ | |
| import numpy as np | |
| import os | |
| import importlib.util | |
| # Import BaseController | |
| _controllers_dir = os.path.dirname(os.path.abspath(__file__)) | |
| _base_path = os.path.join(_controllers_dir, "base.py") | |
| _spec = importlib.util.spec_from_file_location("spot_base", _base_path) | |
| _base_module = importlib.util.module_from_spec(_spec) | |
| _spec.loader.exec_module(_base_module) | |
| BaseController = _base_module.BaseController | |
| # Try to import Quadruped-PyMPC components | |
| PYMPC_AVAILABLE = False | |
| # First try local standalone implementation | |
| try: | |
| import sys | |
| import os | |
| # Get the parent directory of controllers (the spot directory) | |
| spot_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| if spot_dir not in sys.path: | |
| sys.path.insert(0, spot_dir) | |
| from quadruped_deps import LegsAttr, GaitType, PeriodicGaitGenerator, mpc_params as pympc_config | |
| PYMPC_AVAILABLE = True | |
| print("Quadruped-PyMPC components loaded from local standalone implementation") | |
| except ImportError as local_err: | |
| # Fall back to external packages | |
| try: | |
| from gym_quadruped.utils.quadruped_utils import LegsAttr | |
| from quadruped_pympc.helpers.periodic_gait_generator import PeriodicGaitGenerator | |
| from quadruped_pympc.helpers.quadruped_utils import GaitType | |
| import quadruped_pympc.config as pympc_config | |
| PYMPC_AVAILABLE = True | |
| print("Quadruped-PyMPC loaded from external packages") | |
| except ImportError as e: | |
| print(f"Quadruped-PyMPC not available (local or external): {e}") | |
| class QuadrupedPyMPCController(BaseController): | |
| """ | |
| Controller using Quadruped-PyMPC's gait generator for timing. | |
| Outputs position targets compatible with SpotEnv's position actuators. | |
| """ | |
| STANDING_POSE = np.array([ | |
| 0.0, 1.04, -1.8, | |
| 0.0, 1.04, -1.8, | |
| 0.0, 1.04, -1.8, | |
| 0.0, 1.04, -1.8, | |
| ], dtype=np.float32) | |
| # Leg indices matching PyMPC convention | |
| LEGS_ORDER = ('FL', 'FR', 'RL', 'RR') | |
| LEG_IDX = {'FL': 0, 'FR': 1, 'RL': 2, 'RR': 3} | |
| def __init__(self, num_joints: int = 12, model=None, data=None): | |
| super().__init__(num_joints) | |
| self.cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32) | |
| self.model = model | |
| self.data = data | |
| self.pympc_initialized = False | |
| # Robot parameters (Spot) | |
| self.mass = 32.86 | |
| self.hip_height = 0.46 | |
| # Simulation parameters | |
| self.sim_dt = 0.002 | |
| # Gait parameters | |
| self.step_freq = 2.5 # Hz - even faster stepping | |
| self.duty_factor = 0.55 # 55% stance time (more swing time) | |
| # Motion parameters - SIGNIFICANTLY INCREASED | |
| self.swing_height = 0.35 # radians - big knee lift during swing | |
| self.stride_length = 0.5 # radians - big hip pitch for forward/back | |
| self.strafe_amplitude = 0.6 # radians - lateral hip roll amplitude (INCREASED) | |
| self.strafe_knee_extra = 0.2 # extra knee lift during strafe | |
| self.turn_amplitude = 0.3 # radians - turning hip roll amplitude | |
| # Feedback gains for balance | |
| self.kp_roll = 0.5 | |
| self.kp_pitch = 0.4 | |
| self.kp_height = 2.5 | |
| self.kd_roll = 0.1 | |
| self.kd_pitch = 0.1 | |
| self.target_height = 0.46 | |
| # Command smoothing - VERY FAST response | |
| self.cmd_filter = 0.6 | |
| self.filtered_cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32) | |
| # Initialize PyMPC components if available | |
| if PYMPC_AVAILABLE: | |
| self._init_pympc() | |
| def _init_pympc(self): | |
| """Initialize Quadruped-PyMPC gait generator.""" | |
| try: | |
| # Configure PyMPC | |
| # Handle both dict-like and module-like config objects | |
| if hasattr(pympc_config, 'mpc_params'): | |
| horizon = pympc_config.mpc_params.get('horizon', 12) | |
| else: | |
| # Using local standalone implementation | |
| horizon = pympc_config.get('horizon', 12) | |
| # Initialize periodic gait generator for trot | |
| self.gait_generator = PeriodicGaitGenerator( | |
| duty_factor=self.duty_factor, | |
| step_freq=self.step_freq, | |
| gait_type=GaitType.TROT, | |
| horizon=horizon, | |
| ) | |
| self.pympc_initialized = True | |
| print("PyMPC gait generator initialized") | |
| except Exception as e: | |
| print(f"Failed to initialize PyMPC components: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| self.pympc_initialized = False | |
| def set_command(self, vx: float = 0.0, vy: float = 0.0, vyaw: float = 0.0): | |
| """Set velocity command.""" | |
| self.cmd[0] = np.clip(vx, -1.0, 1.0) | |
| self.cmd[1] = np.clip(vy, -0.5, 0.5) | |
| self.cmd[2] = np.clip(vyaw, -1.0, 1.0) | |
| def get_command(self): | |
| """Get current velocity command.""" | |
| return self.cmd.copy() | |
| def _get_leg_phase(self, leg_name: str) -> tuple: | |
| """Get swing/stance info for a leg from PyMPC gait generator.""" | |
| leg_idx = self.LEG_IDX[leg_name] | |
| # Phase signal gives normalized phase in gait cycle [0, 1) | |
| phase = self.gait_generator.phase_signal[leg_idx] | |
| # Swing happens when phase < swing_duration | |
| swing_duration = 1.0 - self.duty_factor | |
| in_swing = phase < swing_duration | |
| # Normalize phase within current mode (swing or stance) | |
| if in_swing: | |
| # In swing: phase goes from 0 to swing_duration | |
| # Normalize to [0, 1) | |
| phase_in_mode = phase / swing_duration | |
| else: | |
| # In stance: phase goes from swing_duration to 1.0 | |
| # Normalize to [0, 1) | |
| phase_in_mode = (phase - swing_duration) / self.duty_factor | |
| return in_swing, phase_in_mode | |
| def compute_action(self, obs: np.ndarray, data) -> np.ndarray: | |
| """Compute joint targets using PyMPC gait timing.""" | |
| self.data = data | |
| # Smooth command transition | |
| self.filtered_cmd = (self.cmd_filter * self.filtered_cmd + | |
| (1 - self.cmd_filter) * self.cmd) | |
| vx, vy, vyaw = self.filtered_cmd | |
| # Check if should walk | |
| speed = np.sqrt(vx**2 + vy**2 + vyaw**2) | |
| if speed < 0.05: | |
| return self.STANDING_POSE.copy() | |
| # If PyMPC not available, use simple fallback | |
| if not self.pympc_initialized: | |
| return self._simple_gait(obs) | |
| # Update gait generator | |
| self.gait_generator.run(self.sim_dt, self.step_freq) | |
| # Extract state from observation | |
| base_quat = obs[3:7] # w, x, y, z | |
| base_ang_vel = obs[10:13] | |
| height = obs[2] | |
| # Compute roll and pitch from quaternion | |
| qw, qx, qy, qz = base_quat | |
| roll = np.arctan2(2 * (qw * qx + qy * qz), 1 - 2 * (qx**2 + qy**2)) | |
| sin_pitch = np.clip(2 * (qw * qy - qz * qx), -1.0, 1.0) | |
| pitch = np.arcsin(sin_pitch) | |
| # Angular velocities | |
| roll_rate = base_ang_vel[0] | |
| pitch_rate = base_ang_vel[1] | |
| # Height error | |
| height_error = self.target_height - height | |
| # Compute feedback corrections | |
| roll_correction = -self.kp_roll * roll - self.kd_roll * roll_rate | |
| pitch_correction = -self.kp_pitch * pitch - self.kd_pitch * pitch_rate | |
| height_correction = self.kp_height * height_error | |
| # Output array | |
| target = np.zeros(12, dtype=np.float32) | |
| for leg_name in self.LEGS_ORDER: | |
| leg_idx = self.LEG_IDX[leg_name] | |
| in_swing, phase = self._get_leg_phase(leg_name) | |
| is_front = leg_name in ['FL', 'FR'] | |
| is_left = leg_name in ['FL', 'RL'] | |
| # Base joint positions | |
| hx = 0.0 | |
| hy = 1.04 # standing hip pitch | |
| kn = -1.8 # standing knee | |
| if in_swing: | |
| # Swing phase: lift foot and swing forward | |
| swing_progress = phase | |
| # Hip pitch: swing leg forward/backward | |
| hy_offset = self.stride_length * vx * (1 - 2 * swing_progress) | |
| # Knee: lift in middle of swing (parabolic) | |
| kn_offset = -self.swing_height * np.sin(np.pi * swing_progress) | |
| # TURNING: Hip roll moves outward during swing | |
| if abs(vyaw) > 0.01: | |
| # For turning, swing leg moves outward | |
| sign = 1.0 if is_left else -1.0 | |
| # Differential: front/back legs turn opposite | |
| turn_sign = 1.0 if is_front else -1.0 | |
| hx += sign * turn_sign * self.turn_amplitude * vyaw * np.sin(np.pi * swing_progress) | |
| # Also adjust hip pitch for turning | |
| hy_offset += turn_sign * 0.15 * vyaw * (1 - 2 * swing_progress) | |
| # STRAFE: Aggressive lateral motion during swing | |
| if abs(vy) > 0.01: | |
| # Determine if this leg should step toward or away from strafe direction | |
| # When strafing right (vy < 0): right legs lead, left legs follow | |
| # When strafing left (vy > 0): left legs lead, right legs follow | |
| strafe_dir = 1.0 if vy > 0 else -1.0 | |
| is_leading_leg = (is_left and vy > 0) or (not is_left and vy < 0) | |
| # Leading leg: big outward hip roll to reach | |
| # Trailing leg: hip roll inward to push | |
| if is_leading_leg: | |
| hx += self.strafe_amplitude * abs(vy) * np.sin(np.pi * swing_progress) | |
| else: | |
| hx -= self.strafe_amplitude * 0.5 * abs(vy) * np.sin(np.pi * swing_progress) | |
| # Extra knee lift for clearance during strafe | |
| kn_offset -= self.strafe_knee_extra * abs(vy) * np.sin(np.pi * swing_progress) | |
| # Slight hip pitch adjustment for diagonal reach | |
| hy_offset += 0.1 * abs(vy) * (1 - 2 * swing_progress) | |
| hy += hy_offset | |
| kn += kn_offset | |
| else: | |
| # Stance phase: push body in desired direction | |
| stance_progress = phase | |
| # Hip pitch: push backward for forward motion | |
| hy_offset = self.stride_length * vx * (2 * stance_progress - 1) | |
| # Apply balance feedback during stance | |
| kn_offset = height_correction * 0.3 | |
| # Roll correction through hip roll | |
| if is_left: | |
| hx += roll_correction | |
| else: | |
| hx -= roll_correction | |
| # Pitch correction through hip pitch | |
| if is_front: | |
| hy_offset -= pitch_correction * 0.5 | |
| else: | |
| hy_offset += pitch_correction * 0.5 | |
| # TURNING in stance: rotate body | |
| if abs(vyaw) > 0.01: | |
| turn_sign = 1.0 if is_front else -1.0 | |
| sign = 1.0 if is_left else -1.0 | |
| # Push in opposite direction during stance | |
| hx += sign * turn_sign * self.turn_amplitude * vyaw * np.cos(np.pi * stance_progress) | |
| # STRAFE in stance: push body laterally | |
| if abs(vy) > 0.01: | |
| # In stance, push in direction of strafe | |
| is_leading_leg = (is_left and vy > 0) or (not is_left and vy < 0) | |
| # Leading leg (already placed outward): pull body toward it | |
| # Trailing leg: push body away | |
| if is_leading_leg: | |
| # Pull - hip roll decreases to bring body over | |
| hx -= self.strafe_amplitude * 0.7 * abs(vy) * (1 - stance_progress) | |
| else: | |
| # Push - hip roll increases to push body away | |
| hx += self.strafe_amplitude * 0.7 * abs(vy) * stance_progress | |
| # Also slight knee bend on trailing side for push | |
| if not is_leading_leg: | |
| kn_offset -= 0.1 * abs(vy) * stance_progress | |
| hy += hy_offset | |
| kn += kn_offset | |
| # Clamp to joint limits (from spot.xml) | |
| hx = np.clip(hx, -0.785, 0.785) | |
| hy = np.clip(hy, -0.9, 2.29) | |
| kn = np.clip(kn, -2.79, -0.25) | |
| # Set joint targets | |
| base_idx = leg_idx * 3 | |
| target[base_idx + 0] = hx | |
| target[base_idx + 1] = hy | |
| target[base_idx + 2] = kn | |
| return target | |
| def _simple_gait(self, obs): | |
| """Fallback simple trot gait when PyMPC not available.""" | |
| vx, vy, vyaw = self.filtered_cmd | |
| phase = (self.time * self.step_freq) % 1.0 | |
| phase_offsets = np.array([0.0, 0.5, 0.5, 0.0]) # Trot pattern | |
| target = np.zeros(12, dtype=np.float32) | |
| for leg_idx in range(4): | |
| leg_phase = (phase + phase_offsets[leg_idx]) % 1.0 | |
| swing_duration = 1.0 - self.duty_factor | |
| in_swing = leg_phase < swing_duration | |
| is_left = leg_idx in [0, 2] # FL, RL | |
| hx = 0.0 | |
| hy = 1.04 | |
| kn = -1.8 | |
| if in_swing: | |
| p = leg_phase / swing_duration | |
| hy += self.stride_length * vx * (1 - 2 * p) | |
| kn += -self.swing_height * np.sin(np.pi * p) | |
| else: | |
| p = (leg_phase - swing_duration) / self.duty_factor | |
| hy += self.stride_length * vx * (2 * p - 1) | |
| # Balance feedback | |
| base_quat = obs[3:7] | |
| qw, qx, qy, qz = base_quat | |
| roll = np.arctan2(2 * (qw * qx + qy * qz), 1 - 2 * (qx**2 + qy**2)) | |
| if is_left: | |
| hx -= 0.3 * roll | |
| else: | |
| hx += 0.3 * roll | |
| base_idx = leg_idx * 3 | |
| target[base_idx + 0] = np.clip(hx, -0.785, 0.785) | |
| target[base_idx + 1] = np.clip(hy, -0.9, 2.29) | |
| target[base_idx + 2] = np.clip(kn, -2.79, -0.25) | |
| return target | |
| def reset(self): | |
| super().reset() | |
| self.cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32) | |
| self.filtered_cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32) | |
| if self.pympc_initialized and hasattr(self, 'gait_generator'): | |
| self.gait_generator.reset() | |
| def name(self) -> str: | |
| status = "PyMPC" if self.pympc_initialized else "Fallback" | |
| return f"PyMPC Gait ({status}) (cmd: {self.cmd[0]:.1f}, {self.cmd[1]:.1f}, {self.cmd[2]:.1f})" | |