nova-sim / robots /spot /controllers /quadruped_pympc_controller.py
Georg
Enhance mujoco_server.py and README.md for improved teleoperation and environment management
9078db7
"""Quadruped-PyMPC inspired Controller for Spot robot.
This controller uses components from Quadruped-PyMPC for:
- Periodic gait generation
- Swing trajectory planning
But outputs joint position targets (not torques) to work with SpotEnv's
position-controlled actuators.
Requires: quadruped_pympc, jax, gym_quadruped
"""
import numpy as np
import os
import importlib.util
# Import BaseController
_controllers_dir = os.path.dirname(os.path.abspath(__file__))
_base_path = os.path.join(_controllers_dir, "base.py")
_spec = importlib.util.spec_from_file_location("spot_base", _base_path)
_base_module = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(_base_module)
BaseController = _base_module.BaseController
# Try to import Quadruped-PyMPC components
PYMPC_AVAILABLE = False
# First try local standalone implementation
try:
import sys
import os
# Get the parent directory of controllers (the spot directory)
spot_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if spot_dir not in sys.path:
sys.path.insert(0, spot_dir)
from quadruped_deps import LegsAttr, GaitType, PeriodicGaitGenerator, mpc_params as pympc_config
PYMPC_AVAILABLE = True
print("Quadruped-PyMPC components loaded from local standalone implementation")
except ImportError as local_err:
# Fall back to external packages
try:
from gym_quadruped.utils.quadruped_utils import LegsAttr
from quadruped_pympc.helpers.periodic_gait_generator import PeriodicGaitGenerator
from quadruped_pympc.helpers.quadruped_utils import GaitType
import quadruped_pympc.config as pympc_config
PYMPC_AVAILABLE = True
print("Quadruped-PyMPC loaded from external packages")
except ImportError as e:
print(f"Quadruped-PyMPC not available (local or external): {e}")
class QuadrupedPyMPCController(BaseController):
"""
Controller using Quadruped-PyMPC's gait generator for timing.
Outputs position targets compatible with SpotEnv's position actuators.
"""
STANDING_POSE = np.array([
0.0, 1.04, -1.8,
0.0, 1.04, -1.8,
0.0, 1.04, -1.8,
0.0, 1.04, -1.8,
], dtype=np.float32)
# Leg indices matching PyMPC convention
LEGS_ORDER = ('FL', 'FR', 'RL', 'RR')
LEG_IDX = {'FL': 0, 'FR': 1, 'RL': 2, 'RR': 3}
def __init__(self, num_joints: int = 12, model=None, data=None):
super().__init__(num_joints)
self.cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32)
self.model = model
self.data = data
self.pympc_initialized = False
# Robot parameters (Spot)
self.mass = 32.86
self.hip_height = 0.46
# Simulation parameters
self.sim_dt = 0.002
# Gait parameters
self.step_freq = 2.5 # Hz - even faster stepping
self.duty_factor = 0.55 # 55% stance time (more swing time)
# Motion parameters - SIGNIFICANTLY INCREASED
self.swing_height = 0.35 # radians - big knee lift during swing
self.stride_length = 0.5 # radians - big hip pitch for forward/back
self.strafe_amplitude = 0.6 # radians - lateral hip roll amplitude (INCREASED)
self.strafe_knee_extra = 0.2 # extra knee lift during strafe
self.turn_amplitude = 0.3 # radians - turning hip roll amplitude
# Feedback gains for balance
self.kp_roll = 0.5
self.kp_pitch = 0.4
self.kp_height = 2.5
self.kd_roll = 0.1
self.kd_pitch = 0.1
self.target_height = 0.46
# Command smoothing - VERY FAST response
self.cmd_filter = 0.6
self.filtered_cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32)
# Initialize PyMPC components if available
if PYMPC_AVAILABLE:
self._init_pympc()
def _init_pympc(self):
"""Initialize Quadruped-PyMPC gait generator."""
try:
# Configure PyMPC
# Handle both dict-like and module-like config objects
if hasattr(pympc_config, 'mpc_params'):
horizon = pympc_config.mpc_params.get('horizon', 12)
else:
# Using local standalone implementation
horizon = pympc_config.get('horizon', 12)
# Initialize periodic gait generator for trot
self.gait_generator = PeriodicGaitGenerator(
duty_factor=self.duty_factor,
step_freq=self.step_freq,
gait_type=GaitType.TROT,
horizon=horizon,
)
self.pympc_initialized = True
print("PyMPC gait generator initialized")
except Exception as e:
print(f"Failed to initialize PyMPC components: {e}")
import traceback
traceback.print_exc()
self.pympc_initialized = False
def set_command(self, vx: float = 0.0, vy: float = 0.0, vyaw: float = 0.0):
"""Set velocity command."""
self.cmd[0] = np.clip(vx, -1.0, 1.0)
self.cmd[1] = np.clip(vy, -0.5, 0.5)
self.cmd[2] = np.clip(vyaw, -1.0, 1.0)
def get_command(self):
"""Get current velocity command."""
return self.cmd.copy()
def _get_leg_phase(self, leg_name: str) -> tuple:
"""Get swing/stance info for a leg from PyMPC gait generator."""
leg_idx = self.LEG_IDX[leg_name]
# Phase signal gives normalized phase in gait cycle [0, 1)
phase = self.gait_generator.phase_signal[leg_idx]
# Swing happens when phase < swing_duration
swing_duration = 1.0 - self.duty_factor
in_swing = phase < swing_duration
# Normalize phase within current mode (swing or stance)
if in_swing:
# In swing: phase goes from 0 to swing_duration
# Normalize to [0, 1)
phase_in_mode = phase / swing_duration
else:
# In stance: phase goes from swing_duration to 1.0
# Normalize to [0, 1)
phase_in_mode = (phase - swing_duration) / self.duty_factor
return in_swing, phase_in_mode
def compute_action(self, obs: np.ndarray, data) -> np.ndarray:
"""Compute joint targets using PyMPC gait timing."""
self.data = data
# Smooth command transition
self.filtered_cmd = (self.cmd_filter * self.filtered_cmd +
(1 - self.cmd_filter) * self.cmd)
vx, vy, vyaw = self.filtered_cmd
# Check if should walk
speed = np.sqrt(vx**2 + vy**2 + vyaw**2)
if speed < 0.05:
return self.STANDING_POSE.copy()
# If PyMPC not available, use simple fallback
if not self.pympc_initialized:
return self._simple_gait(obs)
# Update gait generator
self.gait_generator.run(self.sim_dt, self.step_freq)
# Extract state from observation
base_quat = obs[3:7] # w, x, y, z
base_ang_vel = obs[10:13]
height = obs[2]
# Compute roll and pitch from quaternion
qw, qx, qy, qz = base_quat
roll = np.arctan2(2 * (qw * qx + qy * qz), 1 - 2 * (qx**2 + qy**2))
sin_pitch = np.clip(2 * (qw * qy - qz * qx), -1.0, 1.0)
pitch = np.arcsin(sin_pitch)
# Angular velocities
roll_rate = base_ang_vel[0]
pitch_rate = base_ang_vel[1]
# Height error
height_error = self.target_height - height
# Compute feedback corrections
roll_correction = -self.kp_roll * roll - self.kd_roll * roll_rate
pitch_correction = -self.kp_pitch * pitch - self.kd_pitch * pitch_rate
height_correction = self.kp_height * height_error
# Output array
target = np.zeros(12, dtype=np.float32)
for leg_name in self.LEGS_ORDER:
leg_idx = self.LEG_IDX[leg_name]
in_swing, phase = self._get_leg_phase(leg_name)
is_front = leg_name in ['FL', 'FR']
is_left = leg_name in ['FL', 'RL']
# Base joint positions
hx = 0.0
hy = 1.04 # standing hip pitch
kn = -1.8 # standing knee
if in_swing:
# Swing phase: lift foot and swing forward
swing_progress = phase
# Hip pitch: swing leg forward/backward
hy_offset = self.stride_length * vx * (1 - 2 * swing_progress)
# Knee: lift in middle of swing (parabolic)
kn_offset = -self.swing_height * np.sin(np.pi * swing_progress)
# TURNING: Hip roll moves outward during swing
if abs(vyaw) > 0.01:
# For turning, swing leg moves outward
sign = 1.0 if is_left else -1.0
# Differential: front/back legs turn opposite
turn_sign = 1.0 if is_front else -1.0
hx += sign * turn_sign * self.turn_amplitude * vyaw * np.sin(np.pi * swing_progress)
# Also adjust hip pitch for turning
hy_offset += turn_sign * 0.15 * vyaw * (1 - 2 * swing_progress)
# STRAFE: Aggressive lateral motion during swing
if abs(vy) > 0.01:
# Determine if this leg should step toward or away from strafe direction
# When strafing right (vy < 0): right legs lead, left legs follow
# When strafing left (vy > 0): left legs lead, right legs follow
strafe_dir = 1.0 if vy > 0 else -1.0
is_leading_leg = (is_left and vy > 0) or (not is_left and vy < 0)
# Leading leg: big outward hip roll to reach
# Trailing leg: hip roll inward to push
if is_leading_leg:
hx += self.strafe_amplitude * abs(vy) * np.sin(np.pi * swing_progress)
else:
hx -= self.strafe_amplitude * 0.5 * abs(vy) * np.sin(np.pi * swing_progress)
# Extra knee lift for clearance during strafe
kn_offset -= self.strafe_knee_extra * abs(vy) * np.sin(np.pi * swing_progress)
# Slight hip pitch adjustment for diagonal reach
hy_offset += 0.1 * abs(vy) * (1 - 2 * swing_progress)
hy += hy_offset
kn += kn_offset
else:
# Stance phase: push body in desired direction
stance_progress = phase
# Hip pitch: push backward for forward motion
hy_offset = self.stride_length * vx * (2 * stance_progress - 1)
# Apply balance feedback during stance
kn_offset = height_correction * 0.3
# Roll correction through hip roll
if is_left:
hx += roll_correction
else:
hx -= roll_correction
# Pitch correction through hip pitch
if is_front:
hy_offset -= pitch_correction * 0.5
else:
hy_offset += pitch_correction * 0.5
# TURNING in stance: rotate body
if abs(vyaw) > 0.01:
turn_sign = 1.0 if is_front else -1.0
sign = 1.0 if is_left else -1.0
# Push in opposite direction during stance
hx += sign * turn_sign * self.turn_amplitude * vyaw * np.cos(np.pi * stance_progress)
# STRAFE in stance: push body laterally
if abs(vy) > 0.01:
# In stance, push in direction of strafe
is_leading_leg = (is_left and vy > 0) or (not is_left and vy < 0)
# Leading leg (already placed outward): pull body toward it
# Trailing leg: push body away
if is_leading_leg:
# Pull - hip roll decreases to bring body over
hx -= self.strafe_amplitude * 0.7 * abs(vy) * (1 - stance_progress)
else:
# Push - hip roll increases to push body away
hx += self.strafe_amplitude * 0.7 * abs(vy) * stance_progress
# Also slight knee bend on trailing side for push
if not is_leading_leg:
kn_offset -= 0.1 * abs(vy) * stance_progress
hy += hy_offset
kn += kn_offset
# Clamp to joint limits (from spot.xml)
hx = np.clip(hx, -0.785, 0.785)
hy = np.clip(hy, -0.9, 2.29)
kn = np.clip(kn, -2.79, -0.25)
# Set joint targets
base_idx = leg_idx * 3
target[base_idx + 0] = hx
target[base_idx + 1] = hy
target[base_idx + 2] = kn
return target
def _simple_gait(self, obs):
"""Fallback simple trot gait when PyMPC not available."""
vx, vy, vyaw = self.filtered_cmd
phase = (self.time * self.step_freq) % 1.0
phase_offsets = np.array([0.0, 0.5, 0.5, 0.0]) # Trot pattern
target = np.zeros(12, dtype=np.float32)
for leg_idx in range(4):
leg_phase = (phase + phase_offsets[leg_idx]) % 1.0
swing_duration = 1.0 - self.duty_factor
in_swing = leg_phase < swing_duration
is_left = leg_idx in [0, 2] # FL, RL
hx = 0.0
hy = 1.04
kn = -1.8
if in_swing:
p = leg_phase / swing_duration
hy += self.stride_length * vx * (1 - 2 * p)
kn += -self.swing_height * np.sin(np.pi * p)
else:
p = (leg_phase - swing_duration) / self.duty_factor
hy += self.stride_length * vx * (2 * p - 1)
# Balance feedback
base_quat = obs[3:7]
qw, qx, qy, qz = base_quat
roll = np.arctan2(2 * (qw * qx + qy * qz), 1 - 2 * (qx**2 + qy**2))
if is_left:
hx -= 0.3 * roll
else:
hx += 0.3 * roll
base_idx = leg_idx * 3
target[base_idx + 0] = np.clip(hx, -0.785, 0.785)
target[base_idx + 1] = np.clip(hy, -0.9, 2.29)
target[base_idx + 2] = np.clip(kn, -2.79, -0.25)
return target
def reset(self):
super().reset()
self.cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32)
self.filtered_cmd = np.array([0.0, 0.0, 0.0], dtype=np.float32)
if self.pympc_initialized and hasattr(self, 'gait_generator'):
self.gait_generator.reset()
@property
def name(self) -> str:
status = "PyMPC" if self.pympc_initialized else "Fallback"
return f"PyMPC Gait ({status}) (cmd: {self.cmd[0]:.1f}, {self.cmd[1]:.1f}, {self.cmd[2]:.1f})"