nova-sim / mujoco_server.py
Georg
Prepare HF Space deployment
698216c
import os
import sys
import time
import threading
import json
import base64
import math
import re
import traceback
import io
import cv2
import numpy as np
def _configure_mujoco_backend() -> None:
if os.environ.get("NOVA_MUJOCO_GPU", "").lower() in {"1", "true", "yes", "on"}:
os.environ["MUJOCO_GL"] = os.environ.get("MUJOCO_GL", "egl")
os.environ["PYOPENGL_PLATFORM"] = os.environ.get("PYOPENGL_PLATFORM", "egl")
else:
os.environ["MUJOCO_GL"] = os.environ.get("MUJOCO_GL", "osmesa")
os.environ["PYOPENGL_PLATFORM"] = os.environ.get("PYOPENGL_PLATFORM", "osmesa")
_configure_mujoco_backend()
import mujoco
from functools import wraps
from pathlib import Path
from typing import Any, Optional
from flask import Flask, Response, render_template_string, request, jsonify, make_response
from flask_sock import Sock
# Add robots directory to path for imports
_nova_sim_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(_nova_sim_dir, 'robots', 'g1'))
from g1_env import G1Env
sys.path.pop(0)
# Load .env.local for Nova API configuration
def load_env_file(filepath):
"""Load environment variables from .env file."""
if not filepath.exists():
return
with open(filepath, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ[key.strip()] = value.strip()
env_local_path = Path(_nova_sim_dir) / '.env.local'
load_env_file(env_local_path)
# API prefix for all endpoints
API_PREFIX = '/nova-sim/api/v1'
# Determine if Nova credentials are present (required for default toggle state)
def _nova_credentials_present() -> bool:
instance_url = os.getenv('NOVA_INSTANCE_URL') or os.getenv('NOVA_API')
return all([
instance_url,
os.getenv('NOVA_ACCESS_TOKEN'),
os.getenv('NOVA_CONTROLLER_ID'),
os.getenv('NOVA_MOTION_GROUP_ID'),
])
app = Flask(__name__)
app.config['SECRET_KEY'] = 'robotsim-secret'
sock = Sock(app)
# Detect if running in Docker (check for /.dockerenv or cgroup)
def is_running_in_docker():
if os.path.exists('/.dockerenv'):
return True
try:
with open('/proc/1/cgroup', 'r') as f:
return 'docker' in f.read()
except:
return False
IN_DOCKER = is_running_in_docker()
# Performance settings from environment
# Native defaults: 1280x720 @ 60 FPS (GPU accelerated)
# Docker defaults: 640x360 @ 30 FPS (software rendering)
RENDER_WIDTH = int(os.environ.get('RENDER_WIDTH', 640 if IN_DOCKER else 1280))
RENDER_HEIGHT = int(os.environ.get('RENDER_HEIGHT', 360 if IN_DOCKER else 720))
TARGET_FPS = int(os.environ.get('TARGET_FPS', 30 if IN_DOCKER else 60))
SIM_STEPS_PER_FRAME = int(os.environ.get('SIM_STEPS_PER_FRAME', 10 if IN_DOCKER else 5))
# Overlay camera render resolution; keep auxiliary streams lean.
OVERLAY_RENDER_WIDTH = int(os.environ.get('OVERLAY_RENDER_WIDTH', 240))
OVERLAY_RENDER_HEIGHT = int(os.environ.get('OVERLAY_RENDER_HEIGHT', 160))
# Current robot type
current_robot = "g1" # "g1", "spot", "ur5", or "ur5_t_push"
current_scene = None
# Environment instances (lazy loaded)
env_g1 = None
env_spot = None
env_ur5 = None
env_ur5_t_push = None
env = None # Active environment
# Simulation state
running = True
latest_frame = None
frame_lock = threading.Lock()
mujoco_lock = threading.Lock()
renderer = None
needs_robot_switch = None # Robot to switch to
episode_control_state = {
"terminate": False,
"truncate": False,
"start_time": 0.0, # Timestamp when episode started
"max_duration": 120.0, # Max episode duration in seconds (safety limit)
}
# Perception state - stores latest detected object poses
perception_state = {
"poses": [], # List of {object_id, position, orientation, confidence}
"last_update": 0, # Timestamp of last update
}
perception_lock = threading.Lock()
episode_control_lock = threading.Lock()
# Latest teleoperation action (for trainer state)
# Initialize with zero values instead of None so it's always a dict
last_teleop_action: dict[str, Any] = {
"vx": 0.0,
"vy": 0.0,
"vz": 0.0,
"vyaw": 0.0,
"gripper": 0.0, # 0-255 for UR5 gripper, 0 for robots without grippers
# Cartesian rotation velocities (rad/s)
"vrx": 0.0,
"vry": 0.0,
"vrz": 0.0,
# Joint velocities (rad/s)
"j1": 0.0,
"j2": 0.0,
"j3": 0.0,
"j4": 0.0,
"j5": 0.0,
"j6": 0.0,
}
teleop_lock = threading.Lock()
last_teleop_command_time = 0.0 # Timestamp of last teleop command (for debugging)
jogging_active = False # Track if jogging is currently active
# Homing state
homing_state = {
"active": False,
"current_joint": None,
"joint_errors": [],
"current_velocity": 0.5, # Max velocity for homing
"previous_error": None,
"previous_abs_error": None,
"stuck_count": 0,
"is_jogging": False,
"current_direction": None,
"saved_velocity": None, # Store velocity to restore after homing
}
homing_lock = threading.Lock()
jogging_pause_lock = threading.Lock()
jogging_paused = False
# WebSocket clients
ws_clients = set()
ws_clients_lock = threading.Lock()
external_ws_clients = set() # External clients (not UI) that can send notifications
external_ws_clients_lock = threading.Lock()
client_metadata: dict = {} # Metadata for all external clients
client_metadata_lock = threading.Lock()
# Camera state for orbit controls
cam = mujoco.MjvCamera()
cam.azimuth = 135
cam.elevation = -20
cam.distance = 3.0
cam.lookat = np.array([0, 0, 0.8])
# Camera follow mode
camera_follow = True
# Target visibility for push-T scene
t_target_visible = True
def _reset_camera_for_current_robot() -> None:
"""Reset the orbit camera to the defaults for the active robot."""
cam.azimuth = 135
cam.elevation = -20
if current_robot == "g1":
cam.distance = 3.0
cam.lookat = np.array([0.0, 0.0, 0.8])
elif current_robot in ("ur5", "ur5_t_push"):
cam.distance = 1.8
cam.lookat = np.array([0.3, 0.0, 0.6])
cam.azimuth = 150
cam.elevation = -25
else:
cam.distance = 2.5
cam.lookat = np.array([0.0, 0.0, 0.4])
def _reset_active_environment() -> None:
"""Reset the primary simulation environment and associated camera."""
with mujoco_lock:
if env is not None:
env.reset()
_reset_camera_for_current_robot()
# Track episode start time for duration watchdog
with episode_control_lock:
episode_control_state["start_time"] = time.time()
episode_control_state["terminate"] = False
episode_control_state["truncate"] = False
def _schedule_robot_switch(robot: str, scene: Optional[str]) -> None:
"""Schedule a robot/scene switch for the simulation loop."""
global needs_robot_switch
needs_robot_switch = {"robot": robot, "scene": scene}
MODEL_ROOT = Path(_nova_sim_dir) / "robots"
def discover_scene_files(robot: str) -> list[str]:
"""Return a list of available scene names for a robot."""
model_dir = MODEL_ROOT / robot / "model"
if not model_dir.exists():
return []
scenes = []
for file in sorted(model_dir.glob("*.xml")):
scenes.append(file.stem)
return scenes
ROBOT_SCENES = {
"ur5": ["scene_with_gripper"],
"ur5_t_push": ["scene_t_push"],
"g1": ["scene"],
"spot": ["scene"],
}
ROBOT_LABELS = {
"ur5": "Universal Robots UR5e",
"ur5_t_push": "UR5e T-Push Scene",
"g1": "Unitree G1 Humanoid",
"spot": "Boston Dynamics Spot (Quadruped)",
}
DEFAULT_SCENES = {
"ur5": "scene_with_gripper",
"ur5_t_push": "scene_t_push",
"g1": "scene",
"spot": "scene",
}
AVAILABLE_ACTIONS = [
"action",
"reset",
"switch_robot",
"camera",
"camera_follow",
"toggle_target_visibility",
"teleop_action",
"arm_target",
"gripper",
"control_mode",
"joint_positions",
"arm_orientation",
"use_orientation",
]
OVERLAY_CAMERA_PRESETS: dict[str, list[dict[str, Any]]] = {}
overlay_camera_states: dict[str, dict[str, Any]] = {}
overlay_frames: dict[str, bytes | None] = {}
overlay_frame_lock = threading.Lock()
overlay_camera_lock = threading.Lock()
dynamic_camera_configs: dict[str, dict[str, dict[str, Any]]] = {}
def _make_mjv_camera(config: dict[str, float]) -> mujoco.MjvCamera:
cam_obj = mujoco.MjvCamera()
cam_obj.lookat = np.array(config.get("lookat", [0.0, 0.0, 0.0]), dtype=np.float32)
cam_obj.distance = float(config.get("distance", 1.0))
cam_obj.azimuth = float(config.get("azimuth", 0.0))
cam_obj.elevation = float(config.get("elevation", -30.0))
return cam_obj
def _build_overlay_camera_state(config: dict[str, Any], create_renderer: bool = True) -> dict[str, Any]:
"""Build overlay camera state payload from config."""
cam_obj = _make_mjv_camera(config)
renderer_obj = None
if create_renderer and env is not None:
renderer_obj = mujoco.Renderer(
env.model,
height=OVERLAY_RENDER_HEIGHT,
width=OVERLAY_RENDER_WIDTH
)
follow_site = config.get("follow_site")
site_id = -1
if follow_site and env is not None:
site_id = mujoco.mj_name2id(env.model, mujoco.mjtObj.mjOBJ_SITE, follow_site)
offset = np.array(config.get("offset", [0.0, 0.0, 0.0]), dtype=np.float32)
state_payload = {
"camera": cam_obj,
"renderer": renderer_obj,
"label": config.get("label", config["name"]),
"follow_site": follow_site,
"site_id": site_id,
"offset": offset,
"forward_offset": float(config.get("forward_offset", 0.0)),
"track_orientation": bool(config.get("track_orientation")),
"look_target_site": config.get("look_target_site"),
"look_target_id": -1,
}
look_target_site = config.get("look_target_site")
if look_target_site and env is not None:
look_target_id = mujoco.mj_name2id(env.model, mujoco.mjtObj.mjOBJ_SITE, look_target_site)
state_payload["look_target_id"] = look_target_id
return state_payload
def _normalize_vec(vec: np.ndarray) -> np.ndarray:
norm = np.linalg.norm(vec)
if norm < 1e-8:
return vec
return vec / norm
def _rotation_matrix_to_quaternion(mat: np.ndarray) -> dict[str, float]:
"""Convert a 3x3 rotation matrix to quaternion (w, x, y, z)."""
m00, m01, m02 = mat[0, 0], mat[0, 1], mat[0, 2]
m10, m11, m12 = mat[1, 0], mat[1, 1], mat[1, 2]
m20, m21, m22 = mat[2, 0], mat[2, 1], mat[2, 2]
trace = m00 + m11 + m22
if trace > 0.0:
s = math.sqrt(trace + 1.0) * 2.0
w = 0.25 * s
x = (m21 - m12) / s
y = (m02 - m20) / s
z = (m10 - m01) / s
elif m00 > m11 and m00 > m22:
s = math.sqrt(1.0 + m00 - m11 - m22) * 2.0
w = (m21 - m12) / s
x = 0.25 * s
y = (m01 + m10) / s
z = (m02 + m20) / s
elif m11 > m22:
s = math.sqrt(1.0 + m11 - m00 - m22) * 2.0
w = (m02 - m20) / s
x = (m01 + m10) / s
y = 0.25 * s
z = (m12 + m21) / s
else:
s = math.sqrt(1.0 + m22 - m00 - m11) * 2.0
w = (m10 - m01) / s
x = (m02 + m20) / s
y = (m12 + m21) / s
z = 0.25 * s
return {"w": float(w), "x": float(x), "y": float(y), "z": float(z)}
def _camera_axes_from_orbit(cam_obj: mujoco.MjvCamera) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Return (right, up, forward) axes for an orbit camera in world coordinates."""
azimuth = math.radians(float(cam_obj.azimuth))
elevation = math.radians(float(cam_obj.elevation))
right = np.array([math.sin(azimuth), -math.cos(azimuth), 0.0], dtype=np.float32)
up = np.array([
-math.cos(azimuth) * math.sin(elevation),
-math.sin(azimuth) * math.sin(elevation),
math.cos(elevation),
], dtype=np.float32)
forward = np.array([
-math.cos(azimuth) * math.cos(elevation),
-math.sin(azimuth) * math.cos(elevation),
-math.sin(elevation),
], dtype=np.float32)
return _normalize_vec(right), _normalize_vec(up), _normalize_vec(forward)
def _camera_pose(cam_obj: mujoco.MjvCamera) -> dict[str, dict[str, float]]:
"""Compute camera pose (position + orientation) in world coordinates."""
lookat = np.array(cam_obj.lookat, dtype=np.float32)
right, up, forward = _camera_axes_from_orbit(cam_obj)
position = lookat - forward * float(cam_obj.distance)
rotation = np.column_stack((right, up, forward))
quat = _rotation_matrix_to_quaternion(rotation)
return {
"position": {"x": float(position[0]), "y": float(position[1]), "z": float(position[2])},
"orientation": quat,
}
def _camera_fovy_degrees(cam_obj: mujoco.MjvCamera, env_obj) -> float:
fovy = float(getattr(cam_obj, "fovy", 0.0) or 0.0)
if fovy <= 0.0 and env_obj is not None:
fovy = float(env_obj.model.vis.global_.fovy)
return float(fovy) if fovy > 0.0 else 45.0
def _camera_intrinsics(width: int, height: int, fovy_degrees: float) -> dict[str, float]:
"""Compute pinhole intrinsics from image size and vertical field-of-view."""
fovy_rad = math.radians(float(fovy_degrees))
fy = (float(height) / 2.0) / math.tan(fovy_rad / 2.0)
fx = fy
return {
"fx": float(fx),
"fy": float(fy),
"cx": float(width) / 2.0,
"cy": float(height) / 2.0,
"width": int(width),
"height": int(height),
"fovy_degrees": float(fovy_degrees),
}
def _get_site_forward(env_obj, site_id: int) -> np.ndarray:
"""Compute the forward (X axis) vector for a site."""
default = np.array([0.0, 0.0, 1.0], dtype=np.float32)
if env_obj is None or site_id < 0:
return default
if site_id >= getattr(env_obj.model, "nsite", 0):
return default
xmat = getattr(env_obj.data, "site_xmat", None)
if xmat is None or xmat.size < (site_id + 1) * 9:
return default
start = site_id * 9
mat = xmat[start:start + 9]
if mat.size < 9:
return default
try:
forward = np.array([mat[0], mat[3], mat[6]], dtype=np.float32)
except IndexError:
return default
norm = np.linalg.norm(forward)
if norm < 1e-6:
return default
return forward / norm
def _close_overlay_renderers():
global overlay_camera_states
with overlay_camera_lock:
for state in overlay_camera_states.values():
renderer_obj = state.get("renderer")
if renderer_obj:
renderer_obj.close()
overlay_camera_states = {}
with overlay_frame_lock:
overlay_frames.clear()
def _camera_scope_key(robot_type: str | None, scene_name: str | None) -> str:
return f"{robot_type or ''}:{scene_name or ''}"
def _get_dynamic_camera_configs(robot_type: str | None, scene_name: str | None) -> list[dict[str, Any]]:
key = _camera_scope_key(robot_type, scene_name)
configs = dynamic_camera_configs.get(key, {})
return list(configs.values())
def _store_dynamic_camera_config(robot_type: str | None, scene_name: str | None, config: dict[str, Any]) -> None:
key = _camera_scope_key(robot_type, scene_name)
if key not in dynamic_camera_configs:
dynamic_camera_configs[key] = {}
dynamic_camera_configs[key][config["name"]] = config
def prepare_overlay_renderers(robot_type: str, scene_name: str | None):
"""Create overlay renderers for the active robot/scene (if configured)."""
_close_overlay_renderers()
configs = []
if robot_type and robot_type in OVERLAY_CAMERA_PRESETS:
configs = OVERLAY_CAMERA_PRESETS[robot_type]
elif scene_name and scene_name in OVERLAY_CAMERA_PRESETS:
configs = OVERLAY_CAMERA_PRESETS[scene_name]
dynamic_configs = _get_dynamic_camera_configs(robot_type, scene_name)
if dynamic_configs:
merged: dict[str, dict[str, Any]] = {}
for config in configs:
merged[config["name"]] = config
for config in dynamic_configs:
merged[config["name"]] = config
configs = list(merged.values())
if not configs:
return
for config in configs:
state_payload = _build_overlay_camera_state(config)
with overlay_camera_lock:
overlay_camera_states[config["name"]] = state_payload
with overlay_frame_lock:
overlay_frames[config["name"]] = None
def _parse_bool(value):
if value is None:
return False
return str(value).strip().lower() in ("1", "true", "yes", "on")
def _coerce_vec3(value: Any, default: list[float]) -> list[float]:
if value is None:
return default
if isinstance(value, dict):
value = [value.get("x"), value.get("y"), value.get("z")]
if not isinstance(value, (list, tuple)) or len(value) != 3:
raise ValueError("Expected a 3-element vector")
return [float(value[0]), float(value[1]), float(value[2])]
def _valid_camera_name(name: str) -> bool:
if not name:
return False
if name == "main":
return False
return re.match(r"^[A-Za-z0-9_-]+$", name) is not None
def _build_dynamic_camera_config(payload: dict[str, Any]) -> dict[str, Any]:
name = str(payload.get("name", "")).strip()
if not _valid_camera_name(name):
raise ValueError("Invalid camera name. Use letters, numbers, hyphens, or underscores.")
label = str(payload.get("label") or name).strip()
default_lookat = [float(cam.lookat[0]), float(cam.lookat[1]), float(cam.lookat[2])]
default_distance = float(cam.distance)
default_azimuth = float(cam.azimuth)
default_elevation = float(cam.elevation)
lookat = _coerce_vec3(payload.get("lookat"), default_lookat)
distance = float(payload.get("distance", default_distance))
azimuth = float(payload.get("azimuth", default_azimuth))
elevation = float(payload.get("elevation", default_elevation))
config: dict[str, Any] = {
"name": name,
"label": label,
"lookat": lookat,
"distance": distance,
"azimuth": azimuth,
"elevation": elevation,
}
for key in ("follow_site", "look_target_site"):
if payload.get(key) is not None:
config[key] = str(payload.get(key))
if payload.get("offset") is not None:
config["offset"] = _coerce_vec3(payload.get("offset"), [0.0, 0.0, 0.0])
if payload.get("forward_offset") is not None:
config["forward_offset"] = float(payload.get("forward_offset", 0.0))
if payload.get("track_orientation") is not None:
config["track_orientation"] = bool(payload.get("track_orientation"))
return config
def _load_nova_ur5_config_from_env():
use_state = _parse_bool(os.environ.get("NOVA_UR5_USE_STATE_STREAM") or os.environ.get("NOVA_USE_STATE_STREAM"))
use_ik = _parse_bool(os.environ.get("NOVA_UR5_USE_IK") or os.environ.get("NOVA_USE_IK"))
if not (use_state or use_ik):
return None
instance_url = os.environ.get("NOVA_INSTANCE_URL")
access_token = os.environ.get("NOVA_ACCESS_TOKEN")
cell_id = os.environ.get("NOVA_CELL_ID")
controller_id = os.environ.get("NOVA_CONTROLLER_ID")
motion_group_id = os.environ.get("NOVA_MOTION_GROUP_ID")
missing = [key for key, value in {
"NOVA_INSTANCE_URL": instance_url,
"NOVA_ACCESS_TOKEN": access_token,
"NOVA_CELL_ID": cell_id,
"NOVA_CONTROLLER_ID": controller_id,
"NOVA_MOTION_GROUP_ID": motion_group_id,
}.items() if not value]
if missing:
print(f"Nova API integration disabled; missing env vars: {', '.join(missing)}")
return None
return {
"instance_url": instance_url,
"access_token": access_token,
"cell_id": cell_id,
"controller_id": controller_id,
"motion_group_id": motion_group_id,
"motion_group_model": os.environ.get("NOVA_MOTION_GROUP_MODEL"),
"tcp_name": os.environ.get("NOVA_TCP_NAME"),
"response_rate_ms": int(os.environ.get("NOVA_RESPONSE_RATE_MS", "200")),
"use_state_stream": use_state,
"use_ik": use_ik,
}
def init_g1():
"""Initialize G1 environment."""
global env_g1
if env_g1 is None:
env_g1 = G1Env(render_mode="rgb_array", width=RENDER_WIDTH, height=RENDER_HEIGHT)
env_g1.scene_name = "scene"
env_g1.reset()
return env_g1
env_g1.scene_name = "scene"
return env_g1
def init_spot():
"""Initialize Spot environment with PyMPC controller."""
global env_spot
if env_spot is None:
# Import SpotEnv from robots/spot directory
spot_dir = os.path.join(_nova_sim_dir, 'robots', 'spot')
sys.path.insert(0, spot_dir)
from spot_env import SpotEnv
sys.path.pop(0)
# Use pympc controller if available, otherwise fallback to mpc_gait
env_spot = SpotEnv(render_mode="rgb_array", width=RENDER_WIDTH, height=RENDER_HEIGHT, controller_type='pympc')
env_spot.scene_name = "scene"
env_spot.reset()
return env_spot
env_spot.scene_name = "scene"
return env_spot
def init_ur5(scene_name=DEFAULT_SCENES["ur5"]):
"""Initialize UR5e environment."""
global env_ur5, env_ur5_t_push
# Import UR5Env from robots/ur5 directory
ur5_dir = os.path.join(_nova_sim_dir, 'robots', 'ur5')
sys.path.insert(0, ur5_dir)
from ur5_env import UR5Env
sys.path.pop(0)
# Check if Nova API is configured via environment variables
nova_config = None
if os.getenv('NOVA_INSTANCE_URL') and os.getenv('NOVA_ACCESS_TOKEN'):
# Nova API is configured - attempt bidirectional control
# State streaming: Mirror real robot pose while still running physics for objects (T-shape)
# Jogging: UI can send directional jog commands to control the real robot
# Note: If Nova connection fails, UR5Env will automatically fall back to internal IK
print("Nova API credentials detected - attempting bidirectional control")
nova_config = {
"use_state_stream": True, # Attempt to mirror real robot pose
"use_ik": False, # Use jogging API instead of Nova IK (falls back to internal IK if connection fails)
"use_jogging": True # Attempt bidirectional robot control
}
if scene_name == "scene_t_push":
if env_ur5_t_push is None:
env_ur5_t_push = UR5Env(
render_mode="rgb_array",
width=RENDER_WIDTH,
height=RENDER_HEIGHT,
scene_name="scene_t_push",
nova_api_config=nova_config
)
env_ur5_t_push.reset()
return env_ur5_t_push
if env_ur5 is None:
env_ur5 = UR5Env(
render_mode="rgb_array",
width=RENDER_WIDTH,
height=RENDER_HEIGHT,
nova_api_config=nova_config
)
env_ur5.reset()
return env_ur5
def switch_robot(robot_type, scene_name=None):
"""Switch to a different robot."""
global env, current_robot, current_scene, renderer, cam
active_scene = None
if robot_type == current_robot and env is not None:
return
# Close old renderer
if renderer is not None:
renderer.close()
renderer = None
if robot_type == "g1":
env = init_g1()
cam.lookat = np.array([0, 0, 0.8])
cam.distance = 3.0
elif robot_type == "spot":
env = init_spot()
cam.lookat = np.array([0, 0, 0.4])
cam.distance = 2.5
elif robot_type == "ur5":
selected_scene = scene_name or DEFAULT_SCENES.get("ur5")
env = init_ur5(selected_scene)
active_scene = selected_scene
cam.lookat = np.array([0.3, 0, 0.6])
cam.distance = 1.8
cam.azimuth = 150
cam.elevation = -25
elif robot_type == "ur5_t_push":
selected_scene = scene_name or "scene_t_push"
env = init_ur5(selected_scene)
active_scene = selected_scene
cam.lookat = np.array([0.5, 0, 0.55])
cam.distance = 1.9
cam.azimuth = 150
cam.elevation = -25
else:
print(f"Unknown robot type: {robot_type}")
return
current_robot = robot_type
current_scene = active_scene
env.reset()
# Track episode start time for duration watchdog
with episode_control_lock:
episode_control_state["start_time"] = time.time()
episode_control_state["terminate"] = False
episode_control_state["truncate"] = False
# Initialize gripper value in teleop_action for UR5
with teleop_lock:
if robot_type in ("ur5", "ur5_t_push"):
has_gripper = getattr(env, "has_gripper", False)
if has_gripper:
# Get current gripper value from env
gripper_val = getattr(env, "get_gripper", lambda: 128)()
last_teleop_action["gripper"] = float(gripper_val)
else:
last_teleop_action["gripper"] = 0.0
else:
# Locomotion robots don't have grippers
last_teleop_action["gripper"] = 0.0
# Create new renderer
renderer = mujoco.Renderer(env.model, height=env.height, width=env.width)
prepare_overlay_renderers(robot_type, active_scene)
# Initialize with G1
env = init_g1()
def _get_scene_objects():
"""Extract scene objects (position and orientation) from the environment."""
if env is None:
return []
scene_objects = []
# For UR5 scenes, extract T-shape objects if they exist
if current_robot in ("ur5", "ur5_t_push"):
# Get t_object body if it exists
t_object_id = getattr(env, 't_object_body_id', -1)
if t_object_id >= 0:
pos = env.data.xpos[t_object_id]
# MuJoCo stores quaternions in data.xquat as [w, x, y, z]
quat = env.data.xquat[t_object_id]
scene_objects.append({
'name': 't_object',
'position': {'x': float(pos[0]), 'y': float(pos[1]), 'z': float(pos[2])},
'orientation': {'w': float(quat[0]), 'x': float(quat[1]), 'y': float(quat[2]), 'z': float(quat[3])}
})
# Get t_target body if it exists
t_target_id = getattr(env, 't_target_body_id', -1)
if t_target_id >= 0:
pos = env.data.xpos[t_target_id]
quat = env.data.xquat[t_target_id]
scene_objects.append({
'name': 't_target',
'position': {'x': float(pos[0]), 'y': float(pos[1]), 'z': float(pos[2])},
'orientation': {'w': float(quat[0]), 'x': float(quat[1]), 'y': float(quat[2]), 'z': float(quat[3])}
})
# Get box body if it exists
try:
box_id = mujoco.mj_name2id(env.model, mujoco.mjtObj.mjOBJ_BODY, "box")
if box_id >= 0:
pos = env.data.xpos[box_id]
quat = env.data.xquat[box_id]
scene_objects.append({
'name': 'box',
'position': {'x': float(pos[0]), 'y': float(pos[1]), 'z': float(pos[2])},
'orientation': {'w': float(quat[0]), 'x': float(quat[1]), 'y': float(quat[2]), 'z': float(quat[3])}
})
except:
pass
return scene_objects
def broadcast_state():
"""Broadcast robot state to all connected WebSocket clients."""
with mujoco_lock:
if env is None:
return
obs = env._get_obs()
cmd = env.get_command()
steps = env.steps
command = {
"vx": float(cmd[0]) if len(cmd) > 0 else 0.0,
"vy": float(cmd[1]) if len(cmd) > 1 else 0.0,
"vyaw": float(cmd[2]) if len(cmd) > 2 else 0.0,
"vz": 0.0,
}
with teleop_lock:
teleop_snapshot = last_teleop_action.copy()
reward_value = None
if hasattr(env, "get_task_reward"):
reward_value = env.get_task_reward()
# Build lists of identified and unidentified clients
with client_metadata_lock:
identified_clients = []
unidentified_clients = []
for ws, info in list(client_metadata.items()):
identity = info.get("identity")
if identity:
# Client has identified themselves
if ws in external_ws_clients:
identified_clients.append(identity)
else:
# Client connected but hasn't sent identity yet
unidentified_clients.append(f"unidentified-{id(ws) % 10000}")
# Get scene objects
scene_objects = _get_scene_objects()
# UR5 has different state structure
if current_robot in ("ur5", "ur5_t_push"):
ee_pos = env.get_end_effector_pos()
ee_quat = env.get_end_effector_quat()
target = env.get_target()
target_euler = env.get_target_orientation()
gripper = env.get_gripper()
joint_pos = env.get_joint_positions()
joint_targets = env.get_joint_targets()
control_mode = env.get_control_mode()
# Check Nova API status
nova_client = getattr(env, '_nova_client', None)
nova_state_streaming = getattr(env, '_use_nova_state_stream', False)
nova_ik = getattr(env, '_use_nova_ik', False)
# Check if actually connected (WebSocket active and receiving data)
nova_connected = False
if nova_client is not None:
if hasattr(nova_client, 'is_state_stream_connected'):
# Check actual stream connection status
nova_connected = nova_client.is_state_stream_connected()
elif hasattr(nova_client, 'is_connected'):
# Fallback: check general connection status
nova_connected = nova_client.is_connected()
else:
# If client exists but we can't verify connection, assume connected
# This handles the IK-only case where state streaming isn't used
nova_connected = True
nova_available = False
nova_enabled_pref = False
has_nova_conf = getattr(env, 'has_nova_configuration', None)
if callable(has_nova_conf):
nova_available = has_nova_conf()
is_nova_enabled = getattr(env, 'is_nova_enabled', None)
if callable(is_nova_enabled):
nova_enabled_pref = is_nova_enabled()
state_msg = json.dumps({
'type': 'state',
'data': {
'robot': current_robot,
'scene': current_scene,
'observation': {
'end_effector': {'x': float(ee_pos[0]), 'y': float(ee_pos[1]), 'z': float(ee_pos[2])},
'ee_orientation': {'w': float(ee_quat[0]), 'x': float(ee_quat[1]), 'y': float(ee_quat[2]), 'z': float(ee_quat[3])},
'ee_target': {'x': float(target[0]), 'y': float(target[1]), 'z': float(target[2])},
'ee_target_orientation': {'roll': float(target_euler[0]), 'pitch': float(target_euler[1]), 'yaw': float(target_euler[2])},
'gripper': float(gripper),
'joint_positions': [float(j) for j in joint_pos],
'joint_targets': [float(j) for j in joint_targets],
},
'control_mode': control_mode,
'steps': int(steps),
'reward': reward_value,
'teleop_action': teleop_snapshot,
'scene_objects': scene_objects,
'nova_api': {
'connected': nova_connected,
'state_streaming': nova_state_streaming,
'ik': nova_ik,
'available': nova_available,
'enabled': nova_enabled_pref
},
'connected_clients': identified_clients,
'unidentified_clients': unidentified_clients
}
})
else:
# Locomotion robots (G1, Spot)
base_pos = obs[0:3] # x, y, z position
base_quat = obs[3:7] # quaternion orientation (w, x, y, z)
state_msg = json.dumps({
'type': 'state',
'data': {
'robot': current_robot,
'scene': current_scene,
'observation': {
'position': {'x': float(base_pos[0]), 'y': float(base_pos[1]), 'z': float(base_pos[2])},
'orientation': {'w': float(base_quat[0]), 'x': float(base_quat[1]), 'y': float(base_quat[2]), 'z': float(base_quat[3])},
},
'steps': int(steps),
'reward': reward_value,
'teleop_action': teleop_snapshot,
'scene_objects': scene_objects,
'connected_clients': identified_clients,
'unidentified_clients': unidentified_clients
}
})
# Send to all connected clients
with ws_clients_lock:
dead_clients = set()
for ws in ws_clients:
try:
ws.send(state_msg)
except Exception:
dead_clients.add(ws)
# Remove dead clients
ws_clients.difference_update(dead_clients)
def broadcast_camera_event(action: str, camera_payload: dict[str, Any], scope: dict[str, Any]) -> None:
"""Announce camera changes via the state stream."""
message = {
"type": "state",
"data": {
"camera_event": {
"action": action,
"camera": camera_payload,
"scope": scope,
"timestamp": time.time(),
}
}
}
with ws_clients_lock:
dead_clients = set()
for ws in ws_clients:
try:
ws.send(json.dumps(message))
except Exception:
dead_clients.add(ws)
ws_clients.difference_update(dead_clients)
def _handle_external_client_message(ws, data):
"""Process message payloads originating from external clients."""
msg_type = data.get("type")
if msg_type in ("client_identity", "trainer_identity"): # Support old name for backward compat
payload = data.get("data", {}) or {}
identity = payload.get("client_id") or payload.get("trainer_id") or payload.get("name") or "client"
with external_ws_clients_lock:
external_ws_clients.add(ws)
# Don't call _register_external_client again - client is already registered on connection
_set_client_identity(ws, identity)
broadcast_state()
broadcast_connected_clients_status()
return
if msg_type in ("client_notification", "notification"): # Support old name for backward compat
payload = data.get("data", {})
payload.setdefault("timestamp", time.time())
broadcast_notification_to_all(payload)
def _build_connected_clients_payload():
"""Build a summary payload describing connected external clients."""
with client_metadata_lock:
identified = []
unidentified = []
for ws, info in client_metadata.items():
identity = info.get("identity")
if identity:
# Client has identified themselves
if ws in external_ws_clients:
identified.append(identity)
else:
# Client connected but hasn't sent identity yet
unidentified.append(f"unidentified-{id(ws) % 10000}")
payload = {
"clients": identified,
"unidentified_clients": unidentified,
}
return payload
def _register_external_client(ws):
with client_metadata_lock:
client_metadata[ws] = {"identity": None, "connected_at": time.time()}
def _unregister_external_client(ws):
with client_metadata_lock:
client_metadata.pop(ws, None)
def _set_client_identity(ws, identity: str):
with client_metadata_lock:
info = client_metadata.get(ws)
if not info:
# Client wasn't registered on connection (shouldn't happen with new flow)
# Register them now for backward compatibility
client_metadata[ws] = {"identity": identity, "connected_at": time.time(), "last_seen": time.time()}
return
info["identity"] = identity
info["last_seen"] = time.time()
def broadcast_connected_clients_status():
"""Notify all clients about connected external clients."""
payload = _build_connected_clients_payload()
message = json.dumps({"type": "connected_clients", "data": payload})
with ws_clients_lock:
dead_clients = set()
for client in ws_clients:
try:
client.send(message)
except:
dead_clients.add(client)
ws_clients.difference_update(dead_clients)
def broadcast_to_external_clients(event_type: str, payload: dict):
"""Broadcast a structured event to connected external WebSocket clients."""
message = json.dumps({"type": event_type, "data": payload})
with external_ws_clients_lock:
dead_clients = set()
for ws in external_ws_clients:
try:
ws.send(message)
except Exception:
dead_clients.add(ws)
external_ws_clients.difference_update(dead_clients)
def broadcast_notification_to_all(payload: dict):
"""Send a notification message (from external client) to all clients."""
notification = json.dumps({"type": "client_notification", "data": payload})
with ws_clients_lock:
dead_clients = set()
for ws in ws_clients:
try:
ws.send(notification)
except Exception:
dead_clients.add(ws)
ws_clients.difference_update(dead_clients)
def _safe_ws_send(ws, message: dict):
"""Send JSON message over WebSocket without raising."""
try:
ws.send(json.dumps(message))
except Exception:
pass
def _signal_episode_control(action: str):
"""Set episode control flags and notify trainer/UI clients."""
action = (action or "").lower()
if action not in ("terminate", "truncate"):
return
with episode_control_lock:
episode_control_state[action] = True
episode_control_state["start_time"] = 0.0 # Reset timer
status_label = "terminated" if action == "terminate" else "truncated"
payload = {
"action": action,
"status": status_label,
"source": "ui",
"timestamp": time.time(),
}
broadcast_to_external_clients("episode_event", payload)
# Reset UI environment so the user can continue without manual reset
with mujoco_lock:
if env is not None:
env.reset()
# Track new episode start time
with episode_control_lock:
episode_control_state["start_time"] = time.time()
episode_control_state["terminate"] = False
episode_control_state["truncate"] = False
def _consume_episode_control_flags():
"""Return and clear the active episode control flags."""
with episode_control_lock:
terminate = episode_control_state.get("terminate", False)
truncate = episode_control_state.get("truncate", False)
episode_control_state["terminate"] = False
episode_control_state["truncate"] = False
return terminate, truncate
def _quat_to_mat(quat):
"""Convert quaternion (w, x, y, z) to 3x3 rotation matrix.
Args:
quat: dict with keys 'w', 'x', 'y', 'z' or tuple/list [w, x, y, z]
Returns:
3x3 rotation matrix as numpy array
"""
if isinstance(quat, dict):
w, x, y, z = quat['w'], quat['x'], quat['y'], quat['z']
else:
w, x, y, z = quat
# Normalize quaternion
norm = math.sqrt(w*w + x*x + y*y + z*z)
if norm < 1e-10:
return np.eye(3)
w, x, y, z = w/norm, x/norm, y/norm, z/norm
# Convert to rotation matrix
mat = np.array([
[1 - 2*(y*y + z*z), 2*(x*y - w*z), 2*(x*z + w*y)],
[ 2*(x*y + w*z), 1 - 2*(x*x + z*z), 2*(y*z - w*x)],
[ 2*(x*z - w*y), 2*(y*z + w*x), 1 - 2*(x*x + y*y)]
])
return mat
def _render_perception_frames(renderer_obj, model, data):
"""Render bounding boxes for detected object poses.
Draws wireframe boxes at detected poses, color-coded by confidence.
Green (high confidence >0.8), Yellow (medium 0.5-0.8), Red (low <0.5).
"""
with perception_lock:
poses = perception_state.get("poses", [])
last_update = perception_state.get("last_update", 0)
# Don't render if data is too old (> 1 second)
if time.time() - last_update > 1.0:
return
# Render bounding box for each detected pose
for pose in poses:
try:
pos = pose.get("position", {})
orientation = pose.get("orientation", {})
confidence = pose.get("confidence", 0.0)
dimensions = pose.get("dimensions")
# Skip if no dimensions provided
if not dimensions:
continue
# Extract position
px = float(pos.get("x", 0.0))
py = float(pos.get("y", 0.0))
pz = float(pos.get("z", 0.0))
# Extract dimensions
if isinstance(dimensions, dict):
width = float(dimensions.get("width", 0.05))
height = float(dimensions.get("height", 0.05))
depth = float(dimensions.get("depth", 0.05))
elif isinstance(dimensions, (list, tuple)):
width, height, depth = float(dimensions[0]), float(dimensions[1]), float(dimensions[2])
else:
continue
# Color based on confidence: high=green, medium=yellow, low=red
if confidence > 0.8:
rgba = [0.0, 1.0, 0.0, 0.5] # Green
elif confidence > 0.5:
rgba = [1.0, 1.0, 0.0, 0.5] # Yellow
else:
rgba = [1.0, 0.0, 0.0, 0.5] # Red
# Convert quaternion to rotation matrix
rot_mat = _quat_to_mat(orientation)
# Render as box geometry
# MuJoCo box size is half-extents, so divide by 2
mujoco.mjv_initGeom(
renderer_obj.scene.geoms[renderer_obj.scene.ngeom],
type=mujoco.mjtGeom.mjGEOM_BOX,
size=[width/2, height/2, depth/2],
pos=[px, py, pz],
mat=rot_mat.flatten(),
rgba=rgba
)
renderer_obj.scene.ngeom += 1
# Limit to max geoms to avoid overflow
if renderer_obj.scene.ngeom >= renderer_obj.scene.maxgeom - 10:
break
except Exception as e:
# Silently skip invalid poses
pass
def simulation_loop():
global running, latest_frame, camera_follow, t_target_visible, renderer, needs_robot_switch, env
print("Starting simulation loop...")
# Initialize renderer
renderer = mujoco.Renderer(env.model, height=env.height, width=env.width)
physics_steps_per_frame = 5
broadcast_counter = 0
def _run_iteration():
nonlocal broadcast_counter
global needs_robot_switch, latest_frame, jogging_active, last_teleop_command_time
if needs_robot_switch is not None:
with mujoco_lock:
switch_robot(
needs_robot_switch.get("robot"),
scene_name=needs_robot_switch.get("scene")
)
needs_robot_switch = None
# Check episode duration watchdog
current_time = time.time()
with episode_control_lock:
episode_start = episode_control_state.get("start_time", 0.0)
max_duration = episode_control_state.get("max_duration", 120.0)
if episode_start > 0 and (current_time - episode_start) > max_duration:
# Episode has exceeded max duration - automatically truncate
print(f"[Watchdog] Episode exceeded max duration ({max_duration}s) - truncating episode")
episode_control_state["truncate"] = True
episode_control_state["start_time"] = 0.0 # Reset timer
# Stop any active jogging
if jogging_active:
with mujoco_lock:
if env is not None and current_robot in ("ur5", "ur5_t_push"):
stop_jog_fn = getattr(env, "stop_jog", None)
if callable(stop_jog_fn):
try:
stop_jog_fn()
jogging_active = False
except Exception as e:
print(f"[Watchdog] Error stopping jogging during truncation: {e}")
with mujoco_lock:
if env is None:
return
sim_dt = env.model.opt.timestep
# Run physics steps
for _ in range(physics_steps_per_frame):
env.step_with_controller(dt=sim_dt)
# Update camera to follow robot or object
if current_robot in ("ur5", "ur5_t_push"):
# For UR5: follow t_object when enabled, table center when disabled
if camera_follow:
# Follow t_object if it exists
t_object_id = getattr(env, 't_object_body_id', -1)
if t_object_id >= 0:
object_pos = env.data.xpos[t_object_id]
cam.lookat[0] = object_pos[0]
cam.lookat[1] = object_pos[1]
cam.lookat[2] = object_pos[2]
else:
# Center on table
cam.lookat[0] = 0.5
cam.lookat[1] = 0.0
cam.lookat[2] = 0.42
elif camera_follow:
# For locomotion robots: follow robot base
robot_pos = env.data.qpos[:3]
cam.lookat[0] = robot_pos[0]
cam.lookat[1] = robot_pos[1]
cam.lookat[2] = 0.8 if current_robot == "g1" else 0.4
# Render
if renderer is not None:
renderer.update_scene(env.data, camera=cam)
# Hide t_target if visibility is disabled (push-T scene only)
if current_robot == "ur5_t_push" and not t_target_visible:
t_target_id = getattr(env, 't_target_body_id', -1)
if t_target_id >= 0:
# Set alpha to 0 for all geoms belonging to t_target body
for i in range(renderer.scene.ngeom):
geom = renderer.scene.geoms[i]
# Check if this geom belongs to t_target body by comparing position
target_pos = env.data.xpos[t_target_id]
geom_pos = geom.pos
# If geom is very close to target body, it likely belongs to it
dist = np.linalg.norm(np.array(geom_pos) - target_pos)
if dist < 0.15: # Within 15cm - likely part of the T-shape
geom.rgba[3] = 0.0 # Set alpha to 0 (invisible)
# Add perception visualization markers
_render_perception_frames(renderer, env.model, env.data)
frame = renderer.render()
else:
return
# Encode frame
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
ret, buffer = cv2.imencode('.jpg', frame_bgr)
if ret:
with frame_lock:
latest_frame = buffer.tobytes()
# Render overlay camera frames (if configured)
with overlay_camera_lock:
overlay_snapshot = list(overlay_camera_states.items())
for name, state in overlay_snapshot:
renderer_obj = state.get("renderer")
cam_obj = state.get("camera")
if renderer_obj is None or cam_obj is None:
if cam_obj is None:
continue
with mujoco_lock:
if env is None:
continue
try:
renderer_obj = mujoco.Renderer(
env.model,
height=OVERLAY_RENDER_HEIGHT,
width=OVERLAY_RENDER_WIDTH
)
except Exception:
continue
with overlay_camera_lock:
if name in overlay_camera_states:
overlay_camera_states[name]["renderer"] = renderer_obj
if renderer_obj is None:
continue
if state.get("follow_site") and env is not None:
site_id = state.get("site_id", -1)
if site_id >= 0 and site_id < getattr(env.model, "nsite", 0):
site_xpos = getattr(env.data, "site_xpos", None)
if site_xpos is not None and site_xpos.size >= (site_id + 1) * 3:
lookat_point = site_xpos[site_id]
else:
continue
offset = state.get("offset")
if offset is not None:
lookat_point = lookat_point + offset
forward_offset = state.get("forward_offset", 0.0)
if forward_offset:
forward = _get_site_forward(env, site_id)
lookat_point = lookat_point + forward * forward_offset
look_target_id = state.get("look_target_id", -1)
target_point = None
if look_target_id >= 0 and site_xpos.size >= (look_target_id + 1) * 3:
target_point = site_xpos[look_target_id]
cam_obj.lookat = target_point if target_point is not None else lookat_point
if state.get("track_orientation"):
forward = _get_site_forward(env, site_id)
yaw = math.degrees(math.atan2(forward[1], forward[0]))
pitch = math.degrees(math.asin(np.clip(forward[2], -1.0, 1.0)))
cam_obj.azimuth = -yaw + 90.0
cam_obj.elevation = -pitch
renderer_obj.update_scene(env.data, camera=cam_obj)
# Hide t_target if visibility is disabled (push-T scene only)
if current_robot == "ur5_t_push" and not t_target_visible:
t_target_id = getattr(env, 't_target_body_id', -1)
if t_target_id >= 0:
# Set alpha to 0 for all geoms belonging to t_target body
for i in range(renderer_obj.scene.ngeom):
geom = renderer_obj.scene.geoms[i]
# Check if this geom belongs to t_target body by comparing position
target_pos = env.data.xpos[t_target_id]
geom_pos = geom.pos
# If geom is very close to target body, it likely belongs to it
dist = np.linalg.norm(np.array(geom_pos) - target_pos)
if dist < 0.15: # Within 15cm - likely part of the T-shape
geom.rgba[3] = 0.0 # Set alpha to 0 (invisible)
overlay_frame = renderer_obj.render()
if overlay_frame is None:
continue
overlay_bgr = cv2.cvtColor(overlay_frame, cv2.COLOR_RGB2BGR)
ret2, buffer2 = cv2.imencode('.jpg', overlay_bgr)
if ret2:
with overlay_frame_lock:
overlay_frames[name] = buffer2.tobytes()
# Broadcast state
broadcast_counter += 1
if broadcast_counter >= 6:
broadcast_counter = 0
broadcast_state()
while running:
loop_start = time.time()
try:
_run_iteration()
except Exception as e:
print(f"Error in simulation loop: {e}")
import traceback
traceback.print_exc()
elapsed = time.time() - loop_start
wait = max(0.001, 0.016 - elapsed)
time.sleep(wait)
if renderer:
renderer.close()
print("Simulation loop stopped.")
# Start simulation thread
sim_thread = threading.Thread(target=simulation_loop, daemon=True)
sim_thread.start()
def generate_frames():
while True:
with frame_lock:
if latest_frame is not None:
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + latest_frame + b'\r\n')
time.sleep(0.04)
def generate_overlay_frames(name: str):
while True:
with overlay_frame_lock:
frame = overlay_frames.get(name)
if frame is not None:
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
time.sleep(0.04)
def _set_jogging_paused(paused: bool) -> bool:
"""Pause or resume jogging commands; returns the previous pause state."""
global jogging_paused
with jogging_pause_lock:
previous = jogging_paused
jogging_paused = paused
return previous
def _is_jogging_paused() -> bool:
with jogging_pause_lock:
return jogging_paused
def _step_multi_axis_homing_locked(tolerance: float = 0.01) -> dict:
"""Run one homing control step (expects mujoco_lock + homing_lock held)."""
homing_state["last_home_time"] = time.time()
if env is None or current_robot not in ("ur5", "ur5_t_push"):
return {"status": "unsupported"}
home_pose = getattr(env, "home_pose", None)
get_joints = getattr(env, "get_joint_positions", None)
start_multi_joint_jog = getattr(env, "start_multi_joint_jog", None)
stop_jog = getattr(env, "stop_jog", None)
if home_pose is None or not callable(get_joints) or not callable(start_multi_joint_jog) or not callable(stop_jog):
return {"status": "unsupported"}
if hasattr(home_pose, "tolist"):
target_positions = home_pose.tolist()
else:
target_positions = list(home_pose)
current_positions = get_joints()
# Calculate errors for all joints
errors = []
for joint_idx in range(min(len(target_positions), len(current_positions))):
current = current_positions[joint_idx]
target = target_positions[joint_idx]
error = target - current
errors.append(error)
max_abs_error = max(abs(e) for e in errors) if errors else 0.0
if max_abs_error < tolerance:
if homing_state.get("is_jogging", False):
stop_jog()
homing_state["is_jogging"] = False
homing_state["active"] = False
homing_state.pop("previous_errors", None)
homing_state.pop("current_velocities", None)
print("[Homing] Completed - all joints at target")
return {
"status": "done",
"errors": errors,
"max_abs_error": max_abs_error,
}
if "previous_errors" not in homing_state:
homing_state["previous_errors"] = [0.0] * len(errors)
base_velocity = 0.15 # rad/s - slower to prevent overshoot
velocities = []
stop_tolerance = tolerance * 0.5
for joint_idx, error in enumerate(errors):
abs_error = abs(error)
prev_error = homing_state["previous_errors"][joint_idx]
if prev_error != 0 and ((prev_error > 0 and error < 0) or (prev_error < 0 and error > 0)):
velocities.append(0.0)
continue
if abs_error < stop_tolerance:
velocities.append(0.0)
else:
if max_abs_error > 0:
velocity = base_velocity * (abs_error / max_abs_error)
else:
velocity = 0.0
damping_zone = tolerance * 3.0
if abs_error < damping_zone:
damping_factor = (abs_error / damping_zone) ** 2
velocity *= damping_factor
min_velocity = 0.02
if velocity < min_velocity:
velocities.append(0.0)
else:
velocities.append(velocity if error > 0 else -velocity)
homing_state["previous_errors"] = errors
if all(abs(v) < 0.001 for v in velocities):
if homing_state.get("is_jogging", False):
stop_jog()
homing_state["is_jogging"] = False
homing_state["active"] = False
homing_state.pop("previous_errors", None)
homing_state.pop("current_velocities", None)
print("[Homing] All joints reached target or stopped")
return {
"status": "done",
"errors": errors,
"max_abs_error": max_abs_error,
}
current_velocities = homing_state.get("current_velocities")
velocities_changed = current_velocities is None or any(
abs(v1 - v2) > 0.01 for v1, v2 in zip(velocities, current_velocities)
)
if velocities_changed:
if homing_state.get("is_jogging", False):
stop_jog()
start_multi_joint_jog(velocities)
homing_state["is_jogging"] = True
homing_state["current_velocities"] = velocities
homing_state["active"] = True
return {
"status": "running",
"errors": errors,
"max_abs_error": max_abs_error,
"velocities": velocities,
}
def handle_ws_message(ws, data):
"""Handle incoming WebSocket message."""
global needs_robot_switch, camera_follow, t_target_visible, last_teleop_action, last_teleop_command_time, jogging_active
msg_type = data.get('type')
if msg_type in ("client_identity", "trainer_identity", "client_notification", "notification"):
_handle_external_client_message(ws, data)
return
# Backward compatibility: map old message types to new ones
if msg_type == 'command':
msg_type = 'action'
data['type'] = 'action'
elif msg_type == 'teleop_command':
msg_type = 'teleop_action'
data['type'] = 'teleop_action'
if msg_type == 'action':
payload = data.get('data', {})
vx = payload.get('vx', 0.0)
vy = payload.get('vy', 0.0)
vz = payload.get('vz', 0.0)
vyaw = payload.get('vyaw', 0.0)
vrx = payload.get('vrx', 0.0)
vry = payload.get('vry', 0.0)
vrz = payload.get('vrz', 0.0)
j1 = payload.get('j1', 0.0)
j2 = payload.get('j2', 0.0)
j3 = payload.get('j3', 0.0)
j4 = payload.get('j4', 0.0)
j5 = payload.get('j5', 0.0)
j6 = payload.get('j6', 0.0)
gripper = payload.get('gripper', None)
if current_robot in ("ur5", "ur5_t_push") and _is_jogging_paused():
if gripper is not None:
with mujoco_lock:
if env is not None:
env.set_gripper(float(gripper))
return
with mujoco_lock:
if env is not None:
# For UR5: translate velocity actions to jogging commands
if current_robot in ("ur5", "ur5_t_push"):
# Handle gripper if specified
if gripper is not None:
env.set_gripper(float(gripper))
# Check which type of velocity is active (joint vs cartesian)
joint_velocities = [float(j1), float(j2), float(j3), float(j4), float(j5), float(j6)]
cartesian_translation = [float(vx), float(vy), float(vz)]
cartesian_rotation = [float(vrx), float(vry), float(vrz)]
any_joint = any(abs(v) > 0.001 for v in joint_velocities)
any_cartesian = any(abs(v) > 0.001 for v in cartesian_translation + cartesian_rotation)
# Update teleop command timestamp
last_teleop_command_time = time.time()
# Apply the appropriate jogging mode
if any_joint:
env.start_multi_joint_jog(joint_velocities)
jogging_active = True
elif any_cartesian:
# Convert m/s to mm/s for translations
translation_mm_s = [v * 1000.0 for v in cartesian_translation]
env.start_jog(
'cartesian_velocity',
translation=translation_mm_s,
rotation=cartesian_rotation,
tcp_id='Flange',
coord_system_id='world'
)
jogging_active = True
else:
# No active velocity - stop jogging
env.stop_jog()
jogging_active = False
else:
# For locomotion robots: use vx, vy, vyaw
env.set_command(vx, vy, vyaw)
with teleop_lock:
# Update all velocity fields in teleop_action
last_teleop_action["vx"] = float(vx)
last_teleop_action["vy"] = float(vy)
last_teleop_action["vz"] = float(vz)
last_teleop_action["vyaw"] = float(vyaw)
last_teleop_action["vrx"] = float(vrx)
last_teleop_action["vry"] = float(vry)
last_teleop_action["vrz"] = float(vrz)
last_teleop_action["j1"] = float(j1)
last_teleop_action["j2"] = float(j2)
last_teleop_action["j3"] = float(j3)
last_teleop_action["j4"] = float(j4)
last_teleop_action["j5"] = float(j5)
last_teleop_action["j6"] = float(j6)
if gripper is not None:
last_teleop_action["gripper"] = float(gripper)
elif msg_type == 'reset':
payload = data.get('data', {})
seed = payload.get('seed')
with mujoco_lock:
if env is not None:
obs, info = env.reset(seed=seed)
# Track episode start time for duration watchdog
with episode_control_lock:
episode_control_state["start_time"] = time.time()
episode_control_state["terminate"] = False
episode_control_state["truncate"] = False
elif msg_type == 'switch_robot':
payload = data.get('data', {})
robot = payload.get('robot', 'g1')
scene = payload.get('scene')
_schedule_robot_switch(robot, scene)
elif msg_type == 'camera':
payload = data.get('data', {})
a = cam.azimuth * np.pi / 180.0
e = cam.elevation * np.pi / 180.0
cam_type = payload.get('action')
if cam_type == 'rotate':
cam.azimuth -= payload.get('dx', 0) * 0.5
cam.elevation -= payload.get('dy', 0) * 0.5
cam.elevation = np.clip(cam.elevation, -89, 89)
elif cam_type == 'zoom':
cam.distance += payload.get('dz', 0) * 0.01
cam.distance = max(0.5, min(20.0, cam.distance))
elif cam_type == 'set_distance':
cam.distance = payload.get('distance', 3.0)
elif cam_type == 'pan':
right = np.array([np.sin(a), -np.cos(a), 0])
up = np.array([-np.cos(a) * np.sin(e), -np.sin(a) * np.sin(e), np.cos(e)])
scale = cam.distance * 0.002
cam.lookat -= (payload.get('dx', 0) * right - payload.get('dy', 0) * up) * scale
elif msg_type == 'camera_follow':
payload = data.get('data', {})
camera_follow = payload.get('follow', True)
elif msg_type == 'toggle_target_visibility':
payload = data.get('data', {})
t_target_visible = payload.get('visible', True)
elif msg_type == 'teleop_action':
payload = data.get('data', {})
# Accept both old format (dx/dy/dz) and new format (vx/vy/vz)
vx = float(payload.get('vx', payload.get('dx', 0.0)))
vy = float(payload.get('vy', payload.get('dy', 0.0)))
vz = float(payload.get('vz', payload.get('dz', 0.0)))
timestamp = time.time()
if current_robot in ("ur5", "ur5_t_push") and _is_jogging_paused():
return
with mujoco_lock:
if env is not None and current_robot in ("ur5", "ur5_t_push"):
# Check if all velocities are zero - if so, stop any active jogging
if abs(vx) < 0.001 and abs(vy) < 0.001 and abs(vz) < 0.001:
stop_jog = getattr(env, "stop_jog", None)
if callable(stop_jog):
stop_jog()
else:
# Apply teleop movement
target = env.get_target()
env.set_target(
target[0] + vx,
target[1] + vy,
target[2] + vz,
update_joint_targets=True
)
updated_target = env.get_target()
else:
updated_target = None
with teleop_lock:
# Update UR5 Cartesian velocities, preserve other fields
last_teleop_action["vx"] = vx
last_teleop_action["vy"] = vy
last_teleop_action["vz"] = vz
last_teleop_action["vyaw"] = 0.0
# Clear jogging velocities when using teleop (jogging and teleop are mutually exclusive)
for j in range(1, 7):
last_teleop_action[f"j{j}"] = 0.0
last_teleop_action["vrx"] = 0.0
last_teleop_action["vry"] = 0.0
last_teleop_action["vrz"] = 0.0
broadcast_to_external_clients(
"action_update",
{
"robot": current_robot,
"scene": getattr(env, "scene_name", None) if env is not None else None,
"delta": {"vx": vx, "vy": vy, "vz": vz},
"ee_target": updated_target.tolist() if updated_target is not None else None,
"timestamp": timestamp,
}
)
elif msg_type == 'episode_control':
payload = data.get('data', {})
action = payload.get('action')
_signal_episode_control(action)
# UR5-specific messages
elif msg_type == 'arm_target':
payload = data.get('data', {})
x = payload.get('x', 0.4)
y = payload.get('y', 0.0)
z = payload.get('z', 0.6)
with mujoco_lock:
if env is not None and current_robot in ("ur5", "ur5_t_push"):
env.set_target(x, y, z)
elif msg_type == 'gripper':
payload = data.get('data', {})
action = payload.get('action', 'open')
# Robotiq 2F-85: 0 = open, 255 = closed (official MuJoCo Menagerie)
if action == 'open':
value = 0 # 0 = fully open
elif action == 'close':
value = 255 # 255 = fully closed
else:
value = payload.get('value', 128)
with mujoco_lock:
if env is not None and current_robot in ("ur5", "ur5_t_push"):
env.set_gripper(value)
# Update teleop_action with gripper value
with teleop_lock:
last_teleop_action["gripper"] = float(value)
elif msg_type == 'control_mode':
payload = data.get('data', {})
mode = payload.get('mode', 'ik')
with mujoco_lock:
if env is not None and current_robot in ("ur5", "ur5_t_push"):
env.set_control_mode(mode)
elif msg_type == 'joint_positions':
payload = data.get('data', {})
positions = payload.get('positions', [])
if len(positions) == 6:
with mujoco_lock:
if env is not None and current_robot in ("ur5", "ur5_t_push"):
env.set_joint_positions(positions)
elif msg_type == 'arm_orientation':
payload = data.get('data', {})
roll = payload.get('roll', 0.0)
pitch = payload.get('pitch', np.pi/2)
yaw = payload.get('yaw', 0.0)
with mujoco_lock:
if env is not None and current_robot in ("ur5", "ur5_t_push"):
env.set_target_orientation(roll, pitch, yaw)
elif msg_type == 'use_orientation':
payload = data.get('data', {})
use = payload.get('enabled', True)
with mujoco_lock:
if env is not None and current_robot in ("ur5", "ur5_t_push"):
env.set_use_orientation(use)
elif msg_type == 'perception_update':
# Handle perception updates from training system
payload = data.get('data', {})
poses = payload.get('poses', [])
timestamp = payload.get('timestamp', time.time())
with perception_lock:
perception_state["poses"] = poses
perception_state["last_update"] = timestamp
# print(f"[Perception] Received {len(poses)} pose(s) at {timestamp:.2f}")
elif msg_type == 'set_nova_mode':
payload = data.get('data', {})
enabled = bool(payload.get('enabled', False))
with mujoco_lock:
if env is not None and current_robot in ("ur5", "ur5_t_push") and hasattr(env, 'set_nova_enabled'):
env.set_nova_enabled(enabled)
# Nova jogging commands
elif msg_type == 'homing':
with mujoco_lock:
if env is not None:
home_pose = getattr(env, "home_pose", None)
set_joints = getattr(env, "set_joint_positions", None)
if home_pose is not None and callable(set_joints):
if hasattr(home_pose, "tolist"):
positions = home_pose.tolist()
else:
positions = list(home_pose)
set_joints(positions)
def _serialize_value(value):
"""Recursively convert numpy data to JSON-friendly types."""
if isinstance(value, np.ndarray):
return _serialize_value(value.tolist())
if isinstance(value, dict):
return {str(k): _serialize_value(v) for k, v in value.items()}
if isinstance(value, (list, tuple)):
return [_serialize_value(v) for v in value]
if isinstance(value, (np.integer,)):
return int(value)
if isinstance(value, (np.floating,)):
return float(value)
return value
def _serialize_space(space):
if hasattr(space, "low") and hasattr(space, "high"):
import math
# Convert numpy arrays to lists, replacing inf with null for valid JSON
low_list = space.low.tolist()
high_list = space.high.tolist()
# Replace infinity values with null (JSON doesn't support Infinity)
low_json = [None if math.isinf(v) else v for v in low_list]
high_json = [None if math.isinf(v) else v for v in high_list]
return {
"type": "box",
"shape": list(space.shape),
"low": low_json,
"high": high_json,
"dtype": str(space.dtype),
}
return {"type": "unknown"}
def _resolve_robot_scene(robot, scene):
if robot == "ur5_t_push" and scene is None:
return "ur5", "scene_t_push"
if robot == "ur5" and scene is None:
return "ur5", "scene"
return robot, scene
def _create_env(robot, scene):
if robot == "g1":
return G1Env(render_mode="rgb_array", width=RENDER_WIDTH, height=RENDER_HEIGHT)
if robot == "spot":
spot_dir = os.path.join(_nova_sim_dir, 'robots', 'spot')
sys.path.insert(0, spot_dir)
from spot_env import SpotEnv
sys.path.pop(0)
return SpotEnv(render_mode="rgb_array", width=RENDER_WIDTH, height=RENDER_HEIGHT, controller_type='pympc')
if robot == "ur5":
ur5_dir = os.path.join(_nova_sim_dir, 'robots', 'ur5')
sys.path.insert(0, ur5_dir)
from ur5_env import UR5Env
sys.path.pop(0)
# scene is already resolved by _resolve_robot_scene (e.g., "scene_t_push")
if scene:
return UR5Env(render_mode="rgb_array", width=RENDER_WIDTH, height=RENDER_HEIGHT, scene_name=scene)
return UR5Env(render_mode="rgb_array", width=RENDER_WIDTH, height=RENDER_HEIGHT)
raise ValueError(f"Unsupported robot for gym: {robot}")
@sock.route(f'{API_PREFIX}/ws')
def websocket_handler(ws):
"""Handle WebSocket connections."""
print('WebSocket client connected', flush=True)
# Register client (track all clients, even before they identify)
with ws_clients_lock:
ws_clients.add(ws)
# Register in metadata as unidentified
_register_external_client(ws)
broadcast_connected_clients_status()
try:
while True:
message = ws.receive()
if message is None:
break
try:
data = json.loads(message)
handle_ws_message(ws, data)
except json.JSONDecodeError:
print(f"Invalid JSON received: {message}")
except Exception as e:
print(f"Error handling message: {e}")
except:
pass
finally:
# Unregister client
with ws_clients_lock:
ws_clients.discard(ws)
with external_ws_clients_lock:
was_external = ws in external_ws_clients
external_ws_clients.discard(ws)
# Always unregister from metadata (all clients are now tracked, not just external ones)
_unregister_external_client(ws)
broadcast_state()
broadcast_connected_clients_status()
print('WebSocket client disconnected')
# Serve UI at /nova-sim (no redirect)
@app.route('/nova-sim')
@app.route('/nova-sim/')
def nova_sim_ui():
return index()
# Redirect root to /nova-sim
@app.route('/')
def root_redirect():
return render_template_string("""
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="refresh" content="0; url='/nova-sim'" />
</head>
<body>
<p>Redirecting to <a href="/nova-sim">/nova-sim</a>...</p>
</body>
</html>
""")
@app.route(API_PREFIX)
@app.route(f'{API_PREFIX}/')
def index():
# Load HTML from frontend/index.html
html_path = Path(_nova_sim_dir) / 'frontend' / 'index.html'
try:
with open(html_path, 'r', encoding='utf-8') as f:
html_template = f.read()
except FileNotFoundError:
return "Error: frontend/index.html not found", 500
# Replace template variables (Python string concatenation syntax from old inline HTML)
html_content = html_template.replace('""" + API_PREFIX + """', API_PREFIX)
response = make_response(html_content)
# Prevent browser caching of the HTML/JavaScript
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
@app.route(f'{API_PREFIX}/video_feed')
def video_feed():
return Response(generate_frames(),
mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route(f'{API_PREFIX}/cameras', methods=['POST'])
def add_camera():
"""Add an overlay camera for the current robot/scene."""
payload = request.get_json(silent=True) or {}
replace_existing = bool(payload.get("replace", False))
try:
config = _build_dynamic_camera_config(payload)
except ValueError as exc:
return jsonify({"error": str(exc)}), 400
with mujoco_lock:
if env is None:
return jsonify({"error": "no environment initialized"}), 503
scene_name = getattr(env, "scene_name", None) or current_scene
robot_type = current_robot
# Validate site references if provided
follow_site = config.get("follow_site")
look_target_site = config.get("look_target_site")
if follow_site:
site_id = mujoco.mj_name2id(env.model, mujoco.mjtObj.mjOBJ_SITE, follow_site)
if site_id < 0:
return jsonify({"error": f"follow_site '{follow_site}' not found"}), 400
if look_target_site:
target_id = mujoco.mj_name2id(env.model, mujoco.mjtObj.mjOBJ_SITE, look_target_site)
if target_id < 0:
return jsonify({"error": f"look_target_site '{look_target_site}' not found"}), 400
temp_cam = _make_mjv_camera(config)
feed_intrinsics = _camera_intrinsics(
OVERLAY_RENDER_WIDTH,
OVERLAY_RENDER_HEIGHT,
_camera_fovy_degrees(temp_cam or cam, env),
)
camera_payload = {
"name": config["name"],
"label": config.get("label", config["name"]),
"url": f"{API_PREFIX}/camera/{config['name']}/video_feed",
"pose": _camera_pose(temp_cam),
"intrinsics": feed_intrinsics,
}
with overlay_camera_lock:
exists = config["name"] in overlay_camera_states
if exists and not replace_existing:
return jsonify({"error": "camera already exists", "name": config["name"]}), 409
_store_dynamic_camera_config(robot_type, scene_name, config)
state_payload = _build_overlay_camera_state(config, create_renderer=False)
# Store reference to old renderer to close after state update (prevents race condition)
old_renderer = None
with overlay_camera_lock:
if config["name"] in overlay_camera_states:
old_renderer = overlay_camera_states[config["name"]].get("renderer")
# Update state first before closing old renderer to prevent use-after-free
overlay_camera_states[config["name"]] = state_payload
with overlay_frame_lock:
overlay_frames[config["name"]] = None
# Close old renderer after state is updated (prevents simulation loop from accessing freed renderer)
if old_renderer:
try:
old_renderer.close()
except Exception:
pass
broadcast_camera_event(
"added",
camera_payload,
{"robot": robot_type, "scene": scene_name},
)
return jsonify({
"status": "ok",
"camera": camera_payload,
"scope": {"robot": robot_type, "scene": scene_name},
})
@app.route(f'{API_PREFIX}/camera/<name>/video_feed')
def camera_feed(name):
if name == "main":
return Response(generate_frames(),
mimetype='multipart/x-mixed-replace; boundary=frame')
with overlay_frame_lock:
if name not in overlay_frames:
return jsonify({"error": "Camera feed not available"}), 404
return Response(generate_overlay_frames(name),
mimetype='multipart/x-mixed-replace; boundary=frame')
def capture_depth_snapshot(camera_name: str) -> Optional[bytes]:
"""Capture a depth image from the specified camera and return as PNG bytes.
Args:
camera_name: Name of the camera ("main" or overlay camera name)
Returns:
PNG-encoded depth image as bytes, or None if capture fails
"""
global env, cam, overlay_camera_states
if env is None:
return None
# Determine which camera to use
camera_obj = None
if camera_name == "main":
camera_obj = cam
else:
with overlay_camera_lock:
if camera_name in overlay_camera_states:
camera_obj = overlay_camera_states[camera_name].get("camera")
if camera_obj is None:
return None
# Create a temporary renderer for depth capture
with mujoco_lock:
try:
if camera_name == "main":
render_height = env.height
render_width = env.width
else:
render_height = OVERLAY_RENDER_HEIGHT
render_width = OVERLAY_RENDER_WIDTH
depth_renderer = mujoco.Renderer(
env.model,
height=render_height,
width=render_width
)
# Enable depth rendering
depth_renderer.enable_depth_rendering()
# Update scene and render depth
depth_renderer.update_scene(env.data, camera=camera_obj)
depth_array = depth_renderer.render() # Returns (height, width) float32
# Close the temporary renderer
depth_renderer.close()
# Convert depth to 16-bit PNG in millimeters (metric depth).
# MuJoCo renderer returns depth in meters; preserve metric scale.
max_depth_m = 10.0
try:
vis_map = getattr(env.model, "vis", None)
vis_map = getattr(vis_map, "map", None) if vis_map is not None else None
zfar = getattr(vis_map, "zfar", None) if vis_map is not None else None
if zfar is not None:
max_depth_m = float(zfar)
except Exception:
pass
# uint16 in mm can represent up to 65.535 m
max_depth_m = min(max_depth_m, 65.535)
depth_m = np.nan_to_num(depth_array, nan=0.0, posinf=max_depth_m, neginf=0.0)
depth_m = np.clip(depth_m, 0.0, max_depth_m)
depth_16bit = (depth_m * 1000.0).round().astype(np.uint16)
# Encode as PNG
ret, buffer = cv2.imencode('.png', depth_16bit)
if not ret:
return None
return buffer.tobytes()
except Exception as e:
print(f"Error capturing depth snapshot: {e}")
traceback.print_exc()
return None
@app.route(f'{API_PREFIX}/camera/<name>/depth_snapshot')
def camera_depth_snapshot(name):
"""Endpoint to capture a single depth image as PNG."""
depth_png = capture_depth_snapshot(name)
if depth_png is None:
return jsonify({"error": "Failed to capture depth image"}), 500
response = make_response(depth_png)
response.headers['Content-Type'] = 'image/png'
response.headers['Content-Disposition'] = f'inline; filename="depth_{name}.png"'
return response
@app.route(f'{API_PREFIX}/metadata')
def metadata():
robots_meta = {}
for robot, scenes in ROBOT_SCENES.items():
robots_meta[robot] = {
"label": ROBOT_LABELS.get(robot, robot),
"scenes": scenes,
}
current_selection = {
"robot": current_robot,
"scene": current_scene,
}
with mujoco_lock:
if env is not None and hasattr(env, "home_pose"):
try:
home_pose = env.home_pose
if home_pose is not None:
pose_list = (
home_pose.tolist()
if hasattr(home_pose, "tolist")
else list(home_pose)
)
current_selection["home_pose"] = [float(val) for val in pose_list]
except Exception:
pass
response = {
"robots": robots_meta,
"actions": AVAILABLE_ACTIONS,
"nova_api": {
"preconfigured": _nova_credentials_present(),
},
}
return jsonify(response)
@app.route(f'{API_PREFIX}/nova_config', methods=['POST'])
def update_nova_config():
payload = request.get_json(silent=True) or {}
field_map = {
"instance_url": "NOVA_INSTANCE_URL",
"access_token": "NOVA_ACCESS_TOKEN",
"cell_id": "NOVA_CELL_ID",
"controller_id": "NOVA_CONTROLLER_ID",
"motion_group_id": "NOVA_MOTION_GROUP_ID",
"motion_group_model": "NOVA_MOTION_GROUP_MODEL",
"tcp_name": "NOVA_TCP_NAME",
"response_rate_ms": "NOVA_RESPONSE_RATE_MS",
}
updated = {}
for key, env_key in field_map.items():
if key not in payload:
continue
value = payload.get(key)
if value is None or (isinstance(value, str) and value.strip() == ""):
os.environ.pop(env_key, None)
updated[env_key] = None
else:
os.environ[env_key] = str(value).strip()
updated[env_key] = os.environ[env_key]
if not _nova_credentials_present():
return jsonify({
"ok": False,
"error": "Missing required Nova credentials",
"updated": updated,
"preconfigured": _nova_credentials_present(),
}), 400
use_state = _parse_bool(os.environ.get("NOVA_UR5_USE_STATE_STREAM") or os.environ.get("NOVA_USE_STATE_STREAM") or "true")
use_ik = _parse_bool(os.environ.get("NOVA_UR5_USE_IK") or os.environ.get("NOVA_USE_IK") or "false")
use_jogging = _parse_bool(os.environ.get("NOVA_UR5_USE_JOGGING") or os.environ.get("NOVA_USE_JOGGING") or "true")
nova_api_config = {
"instance_url": os.getenv("NOVA_INSTANCE_URL") or os.getenv("NOVA_API"),
"access_token": os.getenv("NOVA_ACCESS_TOKEN"),
"cell_id": os.getenv("NOVA_CELL_ID", "cell"),
"controller_id": os.getenv("NOVA_CONTROLLER_ID"),
"motion_group_id": os.getenv("NOVA_MOTION_GROUP_ID"),
"motion_group_model": os.getenv("NOVA_MOTION_GROUP_MODEL"),
"tcp_name": os.getenv("NOVA_TCP_NAME"),
"response_rate_ms": int(os.getenv("NOVA_RESPONSE_RATE_MS", "200")),
"use_state_stream": use_state,
"use_ik": use_ik,
"use_jogging": use_jogging,
}
reconfigured = False
with mujoco_lock:
if env is not None and current_robot in ("ur5", "ur5_t_push") and hasattr(env, "set_nova_enabled"):
try:
if hasattr(env, "_nova_api_config"):
env._nova_api_config = dict(nova_api_config)
reconfigured = bool(env.set_nova_enabled(True))
except Exception:
traceback.print_exc()
return jsonify({
"ok": False,
"error": "Failed to reconfigure Nova integration",
"updated": updated,
"preconfigured": _nova_credentials_present(),
}), 500
return jsonify({
"ok": True,
"updated": updated,
"preconfigured": _nova_credentials_present(),
"reconfigured": reconfigured,
})
def _env_request(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
traceback.print_exc()
raise
return wrapper
@app.route(f'{API_PREFIX}/env')
@_env_request
def get_env_info():
"""Get static information about the current environment."""
with mujoco_lock:
if env is None:
return jsonify({"error": "no environment initialized"}), 503
# Get action and observation spaces
action_space = _serialize_space(env.action_space)
observation_space = _serialize_space(env.observation_space)
# Get static environment properties
has_gripper = getattr(env, 'has_gripper', False)
scene_name = getattr(env, 'scene_name', None)
control_mode = getattr(env, 'get_control_mode', lambda: None)()
# Build camera feed URLs with pose + intrinsics
camera_feeds = []
main_fovy = _camera_fovy_degrees(cam, env)
main_intrinsics = _camera_intrinsics(env.width, env.height, main_fovy)
camera_feeds.append({
"name": "main",
"label": "Main Camera",
"url": f"{API_PREFIX}/video_feed",
"pose": _camera_pose(cam),
"intrinsics": main_intrinsics,
})
# Check for overlay cameras based on scene
configs = (scene_name and OVERLAY_CAMERA_PRESETS.get(scene_name)) or OVERLAY_CAMERA_PRESETS.get(current_robot) or []
dynamic_configs = _get_dynamic_camera_configs(current_robot, scene_name)
if dynamic_configs:
merged_configs: dict[str, dict[str, Any]] = {}
for feed in configs:
merged_configs[feed.get("name", "aux")] = feed
for feed in dynamic_configs:
merged_configs[feed.get("name", "aux")] = feed
configs = list(merged_configs.values())
for feed in configs:
feed_name = feed.get('name', 'aux')
with overlay_camera_lock:
cam_state = overlay_camera_states.get(feed_name, {}).copy()
cam_obj = cam_state.get("camera")
feed_intrinsics = _camera_intrinsics(
OVERLAY_RENDER_WIDTH,
OVERLAY_RENDER_HEIGHT,
_camera_fovy_degrees(cam_obj or cam, env),
)
feed_payload = {
"name": feed_name,
"label": feed.get('label', feed_name),
"url": f"{API_PREFIX}/camera/{feed_name}/video_feed"
}
if cam_obj is not None:
feed_payload["pose"] = _camera_pose(cam_obj)
feed_payload["intrinsics"] = feed_intrinsics
camera_feeds.append({
**feed_payload
})
# Get home pose for current robot
home_pose_list = None
if hasattr(env, "home_pose"):
try:
home_pose = env.home_pose
if home_pose is not None:
pose_list = (
home_pose.tolist()
if hasattr(home_pose, "tolist")
else list(home_pose)
)
home_pose_list = [float(val) for val in pose_list]
except Exception:
pass
response = {
"robot": current_robot,
"scene": scene_name or current_scene,
"has_gripper": has_gripper,
"control_mode": control_mode,
"action_space": action_space,
"observation_space": observation_space,
"camera_feeds": camera_feeds
}
# Add home_pose if available
if home_pose_list is not None:
response["home_pose"] = home_pose_list
return jsonify(response)
@app.route(f'{API_PREFIX}/homing', methods=['GET'])
@_env_request
def home_blocking():
timeout_s = float(request.args.get("timeout_s", 30.0))
tolerance = float(request.args.get("tolerance", 0.01))
poll_interval = float(request.args.get("poll_interval_s", 0.1))
if timeout_s <= 0:
return jsonify({"error": "timeout_s must be > 0"}), 400
if tolerance <= 0:
return jsonify({"error": "tolerance must be > 0"}), 400
if poll_interval <= 0:
return jsonify({"error": "poll_interval_s must be > 0"}), 400
with mujoco_lock:
if env is None:
return jsonify({"error": "no environment initialized"}), 503
if current_robot not in ("ur5", "ur5_t_push"):
with mujoco_lock:
home_pose = getattr(env, "home_pose", None)
set_joints = getattr(env, "set_joint_positions", None)
if home_pose is None or not callable(set_joints):
return jsonify({"error": "homing not supported for current robot"}), 400
positions = home_pose.tolist() if hasattr(home_pose, "tolist") else list(home_pose)
set_joints(positions)
return jsonify({"status": "completed", "robot": current_robot, "homed": True})
with homing_lock:
homing_active = homing_state.get("active")
if homing_active:
active_start_time = time.time()
while True:
with homing_lock:
if not homing_state.get("active"):
break
if time.time() - active_start_time >= timeout_s:
return jsonify({"status": "timeout", "robot": current_robot, "homed": False}), 200
time.sleep(min(poll_interval, 0.5))
with mujoco_lock, homing_lock:
home_pose = getattr(env, "home_pose", None)
get_joints = getattr(env, "get_joint_positions", None)
start_multi_joint_jog = getattr(env, "start_multi_joint_jog", None)
stop_jog = getattr(env, "stop_jog", None)
if home_pose is None or not callable(get_joints) or not callable(start_multi_joint_jog) or not callable(stop_jog):
return jsonify({"error": "Nova homing not supported for current robot"}), 400
homing_state["active"] = True
previous_pause = _set_jogging_paused(True)
start_time = time.time()
status = "running"
result_snapshot = {}
try:
with mujoco_lock:
stop_jog = getattr(env, "stop_jog", None)
if callable(stop_jog):
stop_jog()
while True:
with mujoco_lock, homing_lock:
result_snapshot = _step_multi_axis_homing_locked(tolerance=tolerance)
if result_snapshot.get("status") == "done":
status = "completed"
break
if time.time() - start_time >= timeout_s:
status = "timeout"
break
time.sleep(poll_interval)
finally:
with mujoco_lock, homing_lock:
stop_jog = getattr(env, "stop_jog", None)
if callable(stop_jog) and homing_state.get("is_jogging", False):
stop_jog()
homing_state["active"] = False
homing_state["is_jogging"] = False
homing_state.pop("previous_errors", None)
homing_state.pop("current_velocities", None)
_set_jogging_paused(previous_pause)
with mujoco_lock:
home_pose = getattr(env, "home_pose", None)
get_joints = getattr(env, "get_joint_positions", None)
final_positions = get_joints() if callable(get_joints) else []
home_pose_list = (
home_pose.tolist()
if home_pose is not None and hasattr(home_pose, "tolist")
else (list(home_pose) if home_pose is not None else None)
)
final_errors = None
max_abs_error = None
if home_pose_list is not None and final_positions is not None and len(final_positions) > 0:
final_errors = [float(t - c) for t, c in zip(home_pose_list, final_positions)]
max_abs_error = max(abs(e) for e in final_errors)
final_positions_list = [float(v) for v in final_positions] if final_positions is not None and len(final_positions) > 0 else None
response = {
"status": status,
"robot": current_robot,
"homed": status == "completed",
"duration_s": time.time() - start_time,
"home_pose": home_pose_list,
"final_positions": final_positions_list,
"final_errors": final_errors,
"max_abs_error": max_abs_error,
"jogging_paused": _is_jogging_paused(),
}
return jsonify(response)
if __name__ == '__main__':
port = int(os.environ.get("PORT", "3004"))
app.run(host='0.0.0.0', port=port, debug=False, threaded=True)