import os import sys import time import threading import json import base64 import math import re import traceback import io import cv2 import numpy as np def _configure_mujoco_backend() -> None: if os.environ.get("NOVA_MUJOCO_GPU", "").lower() in {"1", "true", "yes", "on"}: os.environ["MUJOCO_GL"] = os.environ.get("MUJOCO_GL", "egl") os.environ["PYOPENGL_PLATFORM"] = os.environ.get("PYOPENGL_PLATFORM", "egl") else: os.environ["MUJOCO_GL"] = os.environ.get("MUJOCO_GL", "osmesa") os.environ["PYOPENGL_PLATFORM"] = os.environ.get("PYOPENGL_PLATFORM", "osmesa") _configure_mujoco_backend() import mujoco from functools import wraps from pathlib import Path from typing import Any, Optional from flask import Flask, Response, render_template_string, request, jsonify, make_response from flask_sock import Sock # Add robots directory to path for imports _nova_sim_dir = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, os.path.join(_nova_sim_dir, 'robots', 'g1')) from g1_env import G1Env sys.path.pop(0) # Load .env.local for Nova API configuration def load_env_file(filepath): """Load environment variables from .env file.""" if not filepath.exists(): return with open(filepath, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) os.environ[key.strip()] = value.strip() env_local_path = Path(_nova_sim_dir) / '.env.local' load_env_file(env_local_path) # API prefix for all endpoints API_PREFIX = '/nova-sim/api/v1' # Determine if Nova credentials are present (required for default toggle state) def _nova_credentials_present() -> bool: instance_url = os.getenv('NOVA_INSTANCE_URL') or os.getenv('NOVA_API') return all([ instance_url, os.getenv('NOVA_ACCESS_TOKEN'), os.getenv('NOVA_CONTROLLER_ID'), os.getenv('NOVA_MOTION_GROUP_ID'), ]) app = Flask(__name__) app.config['SECRET_KEY'] = 'robotsim-secret' sock = Sock(app) # Detect if running in Docker (check for /.dockerenv or cgroup) def is_running_in_docker(): if os.path.exists('/.dockerenv'): return True try: with open('/proc/1/cgroup', 'r') as f: return 'docker' in f.read() except: return False IN_DOCKER = is_running_in_docker() # Performance settings from environment # Native defaults: 1280x720 @ 60 FPS (GPU accelerated) # Docker defaults: 640x360 @ 30 FPS (software rendering) RENDER_WIDTH = int(os.environ.get('RENDER_WIDTH', 640 if IN_DOCKER else 1280)) RENDER_HEIGHT = int(os.environ.get('RENDER_HEIGHT', 360 if IN_DOCKER else 720)) TARGET_FPS = int(os.environ.get('TARGET_FPS', 30 if IN_DOCKER else 60)) SIM_STEPS_PER_FRAME = int(os.environ.get('SIM_STEPS_PER_FRAME', 10 if IN_DOCKER else 5)) # Overlay camera render resolution; keep auxiliary streams lean. OVERLAY_RENDER_WIDTH = int(os.environ.get('OVERLAY_RENDER_WIDTH', 240)) OVERLAY_RENDER_HEIGHT = int(os.environ.get('OVERLAY_RENDER_HEIGHT', 160)) # Current robot type current_robot = "g1" # "g1", "spot", "ur5", or "ur5_t_push" current_scene = None # Environment instances (lazy loaded) env_g1 = None env_spot = None env_ur5 = None env_ur5_t_push = None env = None # Active environment # Simulation state running = True latest_frame = None frame_lock = threading.Lock() mujoco_lock = threading.Lock() renderer = None needs_robot_switch = None # Robot to switch to episode_control_state = { "terminate": False, "truncate": False, "start_time": 0.0, # Timestamp when episode started "max_duration": 120.0, # Max episode duration in seconds (safety limit) } # Perception state - stores latest detected object poses perception_state = { "poses": [], # List of {object_id, position, orientation, confidence} "last_update": 0, # Timestamp of last update } perception_lock = threading.Lock() episode_control_lock = threading.Lock() # Latest teleoperation action (for trainer state) # Initialize with zero values instead of None so it's always a dict last_teleop_action: dict[str, Any] = { "vx": 0.0, "vy": 0.0, "vz": 0.0, "vyaw": 0.0, "gripper": 0.0, # 0-255 for UR5 gripper, 0 for robots without grippers # Cartesian rotation velocities (rad/s) "vrx": 0.0, "vry": 0.0, "vrz": 0.0, # Joint velocities (rad/s) "j1": 0.0, "j2": 0.0, "j3": 0.0, "j4": 0.0, "j5": 0.0, "j6": 0.0, } teleop_lock = threading.Lock() last_teleop_command_time = 0.0 # Timestamp of last teleop command (for debugging) jogging_active = False # Track if jogging is currently active # Homing state homing_state = { "active": False, "current_joint": None, "joint_errors": [], "current_velocity": 0.5, # Max velocity for homing "previous_error": None, "previous_abs_error": None, "stuck_count": 0, "is_jogging": False, "current_direction": None, "saved_velocity": None, # Store velocity to restore after homing } homing_lock = threading.Lock() jogging_pause_lock = threading.Lock() jogging_paused = False # WebSocket clients ws_clients = set() ws_clients_lock = threading.Lock() external_ws_clients = set() # External clients (not UI) that can send notifications external_ws_clients_lock = threading.Lock() client_metadata: dict = {} # Metadata for all external clients client_metadata_lock = threading.Lock() # Camera state for orbit controls cam = mujoco.MjvCamera() cam.azimuth = 135 cam.elevation = -20 cam.distance = 3.0 cam.lookat = np.array([0, 0, 0.8]) # Camera follow mode camera_follow = True # Target visibility for push-T scene t_target_visible = True def _reset_camera_for_current_robot() -> None: """Reset the orbit camera to the defaults for the active robot.""" cam.azimuth = 135 cam.elevation = -20 if current_robot == "g1": cam.distance = 3.0 cam.lookat = np.array([0.0, 0.0, 0.8]) elif current_robot in ("ur5", "ur5_t_push"): cam.distance = 1.8 cam.lookat = np.array([0.3, 0.0, 0.6]) cam.azimuth = 150 cam.elevation = -25 else: cam.distance = 2.5 cam.lookat = np.array([0.0, 0.0, 0.4]) def _reset_active_environment() -> None: """Reset the primary simulation environment and associated camera.""" with mujoco_lock: if env is not None: env.reset() _reset_camera_for_current_robot() # Track episode start time for duration watchdog with episode_control_lock: episode_control_state["start_time"] = time.time() episode_control_state["terminate"] = False episode_control_state["truncate"] = False def _schedule_robot_switch(robot: str, scene: Optional[str]) -> None: """Schedule a robot/scene switch for the simulation loop.""" global needs_robot_switch needs_robot_switch = {"robot": robot, "scene": scene} MODEL_ROOT = Path(_nova_sim_dir) / "robots" def discover_scene_files(robot: str) -> list[str]: """Return a list of available scene names for a robot.""" model_dir = MODEL_ROOT / robot / "model" if not model_dir.exists(): return [] scenes = [] for file in sorted(model_dir.glob("*.xml")): scenes.append(file.stem) return scenes ROBOT_SCENES = { "ur5": ["scene_with_gripper"], "ur5_t_push": ["scene_t_push"], "g1": ["scene"], "spot": ["scene"], } ROBOT_LABELS = { "ur5": "Universal Robots UR5e", "ur5_t_push": "UR5e T-Push Scene", "g1": "Unitree G1 Humanoid", "spot": "Boston Dynamics Spot (Quadruped)", } DEFAULT_SCENES = { "ur5": "scene_with_gripper", "ur5_t_push": "scene_t_push", "g1": "scene", "spot": "scene", } AVAILABLE_ACTIONS = [ "action", "reset", "switch_robot", "camera", "camera_follow", "toggle_target_visibility", "teleop_action", "arm_target", "gripper", "control_mode", "joint_positions", "arm_orientation", "use_orientation", ] OVERLAY_CAMERA_PRESETS: dict[str, list[dict[str, Any]]] = {} overlay_camera_states: dict[str, dict[str, Any]] = {} overlay_frames: dict[str, bytes | None] = {} overlay_frame_lock = threading.Lock() overlay_camera_lock = threading.Lock() dynamic_camera_configs: dict[str, dict[str, dict[str, Any]]] = {} def _make_mjv_camera(config: dict[str, float]) -> mujoco.MjvCamera: cam_obj = mujoco.MjvCamera() cam_obj.lookat = np.array(config.get("lookat", [0.0, 0.0, 0.0]), dtype=np.float32) cam_obj.distance = float(config.get("distance", 1.0)) cam_obj.azimuth = float(config.get("azimuth", 0.0)) cam_obj.elevation = float(config.get("elevation", -30.0)) return cam_obj def _build_overlay_camera_state(config: dict[str, Any], create_renderer: bool = True) -> dict[str, Any]: """Build overlay camera state payload from config.""" cam_obj = _make_mjv_camera(config) renderer_obj = None if create_renderer and env is not None: renderer_obj = mujoco.Renderer( env.model, height=OVERLAY_RENDER_HEIGHT, width=OVERLAY_RENDER_WIDTH ) follow_site = config.get("follow_site") site_id = -1 if follow_site and env is not None: site_id = mujoco.mj_name2id(env.model, mujoco.mjtObj.mjOBJ_SITE, follow_site) offset = np.array(config.get("offset", [0.0, 0.0, 0.0]), dtype=np.float32) state_payload = { "camera": cam_obj, "renderer": renderer_obj, "label": config.get("label", config["name"]), "follow_site": follow_site, "site_id": site_id, "offset": offset, "forward_offset": float(config.get("forward_offset", 0.0)), "track_orientation": bool(config.get("track_orientation")), "look_target_site": config.get("look_target_site"), "look_target_id": -1, } look_target_site = config.get("look_target_site") if look_target_site and env is not None: look_target_id = mujoco.mj_name2id(env.model, mujoco.mjtObj.mjOBJ_SITE, look_target_site) state_payload["look_target_id"] = look_target_id return state_payload def _normalize_vec(vec: np.ndarray) -> np.ndarray: norm = np.linalg.norm(vec) if norm < 1e-8: return vec return vec / norm def _rotation_matrix_to_quaternion(mat: np.ndarray) -> dict[str, float]: """Convert a 3x3 rotation matrix to quaternion (w, x, y, z).""" m00, m01, m02 = mat[0, 0], mat[0, 1], mat[0, 2] m10, m11, m12 = mat[1, 0], mat[1, 1], mat[1, 2] m20, m21, m22 = mat[2, 0], mat[2, 1], mat[2, 2] trace = m00 + m11 + m22 if trace > 0.0: s = math.sqrt(trace + 1.0) * 2.0 w = 0.25 * s x = (m21 - m12) / s y = (m02 - m20) / s z = (m10 - m01) / s elif m00 > m11 and m00 > m22: s = math.sqrt(1.0 + m00 - m11 - m22) * 2.0 w = (m21 - m12) / s x = 0.25 * s y = (m01 + m10) / s z = (m02 + m20) / s elif m11 > m22: s = math.sqrt(1.0 + m11 - m00 - m22) * 2.0 w = (m02 - m20) / s x = (m01 + m10) / s y = 0.25 * s z = (m12 + m21) / s else: s = math.sqrt(1.0 + m22 - m00 - m11) * 2.0 w = (m10 - m01) / s x = (m02 + m20) / s y = (m12 + m21) / s z = 0.25 * s return {"w": float(w), "x": float(x), "y": float(y), "z": float(z)} def _camera_axes_from_orbit(cam_obj: mujoco.MjvCamera) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Return (right, up, forward) axes for an orbit camera in world coordinates.""" azimuth = math.radians(float(cam_obj.azimuth)) elevation = math.radians(float(cam_obj.elevation)) right = np.array([math.sin(azimuth), -math.cos(azimuth), 0.0], dtype=np.float32) up = np.array([ -math.cos(azimuth) * math.sin(elevation), -math.sin(azimuth) * math.sin(elevation), math.cos(elevation), ], dtype=np.float32) forward = np.array([ -math.cos(azimuth) * math.cos(elevation), -math.sin(azimuth) * math.cos(elevation), -math.sin(elevation), ], dtype=np.float32) return _normalize_vec(right), _normalize_vec(up), _normalize_vec(forward) def _camera_pose(cam_obj: mujoco.MjvCamera) -> dict[str, dict[str, float]]: """Compute camera pose (position + orientation) in world coordinates.""" lookat = np.array(cam_obj.lookat, dtype=np.float32) right, up, forward = _camera_axes_from_orbit(cam_obj) position = lookat - forward * float(cam_obj.distance) rotation = np.column_stack((right, up, forward)) quat = _rotation_matrix_to_quaternion(rotation) return { "position": {"x": float(position[0]), "y": float(position[1]), "z": float(position[2])}, "orientation": quat, } def _camera_fovy_degrees(cam_obj: mujoco.MjvCamera, env_obj) -> float: fovy = float(getattr(cam_obj, "fovy", 0.0) or 0.0) if fovy <= 0.0 and env_obj is not None: fovy = float(env_obj.model.vis.global_.fovy) return float(fovy) if fovy > 0.0 else 45.0 def _camera_intrinsics(width: int, height: int, fovy_degrees: float) -> dict[str, float]: """Compute pinhole intrinsics from image size and vertical field-of-view.""" fovy_rad = math.radians(float(fovy_degrees)) fy = (float(height) / 2.0) / math.tan(fovy_rad / 2.0) fx = fy return { "fx": float(fx), "fy": float(fy), "cx": float(width) / 2.0, "cy": float(height) / 2.0, "width": int(width), "height": int(height), "fovy_degrees": float(fovy_degrees), } def _get_site_forward(env_obj, site_id: int) -> np.ndarray: """Compute the forward (X axis) vector for a site.""" default = np.array([0.0, 0.0, 1.0], dtype=np.float32) if env_obj is None or site_id < 0: return default if site_id >= getattr(env_obj.model, "nsite", 0): return default xmat = getattr(env_obj.data, "site_xmat", None) if xmat is None or xmat.size < (site_id + 1) * 9: return default start = site_id * 9 mat = xmat[start:start + 9] if mat.size < 9: return default try: forward = np.array([mat[0], mat[3], mat[6]], dtype=np.float32) except IndexError: return default norm = np.linalg.norm(forward) if norm < 1e-6: return default return forward / norm def _close_overlay_renderers(): global overlay_camera_states with overlay_camera_lock: for state in overlay_camera_states.values(): renderer_obj = state.get("renderer") if renderer_obj: renderer_obj.close() overlay_camera_states = {} with overlay_frame_lock: overlay_frames.clear() def _camera_scope_key(robot_type: str | None, scene_name: str | None) -> str: return f"{robot_type or ''}:{scene_name or ''}" def _get_dynamic_camera_configs(robot_type: str | None, scene_name: str | None) -> list[dict[str, Any]]: key = _camera_scope_key(robot_type, scene_name) configs = dynamic_camera_configs.get(key, {}) return list(configs.values()) def _store_dynamic_camera_config(robot_type: str | None, scene_name: str | None, config: dict[str, Any]) -> None: key = _camera_scope_key(robot_type, scene_name) if key not in dynamic_camera_configs: dynamic_camera_configs[key] = {} dynamic_camera_configs[key][config["name"]] = config def prepare_overlay_renderers(robot_type: str, scene_name: str | None): """Create overlay renderers for the active robot/scene (if configured).""" _close_overlay_renderers() configs = [] if robot_type and robot_type in OVERLAY_CAMERA_PRESETS: configs = OVERLAY_CAMERA_PRESETS[robot_type] elif scene_name and scene_name in OVERLAY_CAMERA_PRESETS: configs = OVERLAY_CAMERA_PRESETS[scene_name] dynamic_configs = _get_dynamic_camera_configs(robot_type, scene_name) if dynamic_configs: merged: dict[str, dict[str, Any]] = {} for config in configs: merged[config["name"]] = config for config in dynamic_configs: merged[config["name"]] = config configs = list(merged.values()) if not configs: return for config in configs: state_payload = _build_overlay_camera_state(config) with overlay_camera_lock: overlay_camera_states[config["name"]] = state_payload with overlay_frame_lock: overlay_frames[config["name"]] = None def _parse_bool(value): if value is None: return False return str(value).strip().lower() in ("1", "true", "yes", "on") def _coerce_vec3(value: Any, default: list[float]) -> list[float]: if value is None: return default if isinstance(value, dict): value = [value.get("x"), value.get("y"), value.get("z")] if not isinstance(value, (list, tuple)) or len(value) != 3: raise ValueError("Expected a 3-element vector") return [float(value[0]), float(value[1]), float(value[2])] def _valid_camera_name(name: str) -> bool: if not name: return False if name == "main": return False return re.match(r"^[A-Za-z0-9_-]+$", name) is not None def _build_dynamic_camera_config(payload: dict[str, Any]) -> dict[str, Any]: name = str(payload.get("name", "")).strip() if not _valid_camera_name(name): raise ValueError("Invalid camera name. Use letters, numbers, hyphens, or underscores.") label = str(payload.get("label") or name).strip() default_lookat = [float(cam.lookat[0]), float(cam.lookat[1]), float(cam.lookat[2])] default_distance = float(cam.distance) default_azimuth = float(cam.azimuth) default_elevation = float(cam.elevation) lookat = _coerce_vec3(payload.get("lookat"), default_lookat) distance = float(payload.get("distance", default_distance)) azimuth = float(payload.get("azimuth", default_azimuth)) elevation = float(payload.get("elevation", default_elevation)) config: dict[str, Any] = { "name": name, "label": label, "lookat": lookat, "distance": distance, "azimuth": azimuth, "elevation": elevation, } for key in ("follow_site", "look_target_site"): if payload.get(key) is not None: config[key] = str(payload.get(key)) if payload.get("offset") is not None: config["offset"] = _coerce_vec3(payload.get("offset"), [0.0, 0.0, 0.0]) if payload.get("forward_offset") is not None: config["forward_offset"] = float(payload.get("forward_offset", 0.0)) if payload.get("track_orientation") is not None: config["track_orientation"] = bool(payload.get("track_orientation")) return config def _load_nova_ur5_config_from_env(): use_state = _parse_bool(os.environ.get("NOVA_UR5_USE_STATE_STREAM") or os.environ.get("NOVA_USE_STATE_STREAM")) use_ik = _parse_bool(os.environ.get("NOVA_UR5_USE_IK") or os.environ.get("NOVA_USE_IK")) if not (use_state or use_ik): return None instance_url = os.environ.get("NOVA_INSTANCE_URL") access_token = os.environ.get("NOVA_ACCESS_TOKEN") cell_id = os.environ.get("NOVA_CELL_ID") controller_id = os.environ.get("NOVA_CONTROLLER_ID") motion_group_id = os.environ.get("NOVA_MOTION_GROUP_ID") missing = [key for key, value in { "NOVA_INSTANCE_URL": instance_url, "NOVA_ACCESS_TOKEN": access_token, "NOVA_CELL_ID": cell_id, "NOVA_CONTROLLER_ID": controller_id, "NOVA_MOTION_GROUP_ID": motion_group_id, }.items() if not value] if missing: print(f"Nova API integration disabled; missing env vars: {', '.join(missing)}") return None return { "instance_url": instance_url, "access_token": access_token, "cell_id": cell_id, "controller_id": controller_id, "motion_group_id": motion_group_id, "motion_group_model": os.environ.get("NOVA_MOTION_GROUP_MODEL"), "tcp_name": os.environ.get("NOVA_TCP_NAME"), "response_rate_ms": int(os.environ.get("NOVA_RESPONSE_RATE_MS", "200")), "use_state_stream": use_state, "use_ik": use_ik, } def init_g1(): """Initialize G1 environment.""" global env_g1 if env_g1 is None: env_g1 = G1Env(render_mode="rgb_array", width=RENDER_WIDTH, height=RENDER_HEIGHT) env_g1.scene_name = "scene" env_g1.reset() return env_g1 env_g1.scene_name = "scene" return env_g1 def init_spot(): """Initialize Spot environment with PyMPC controller.""" global env_spot if env_spot is None: # Import SpotEnv from robots/spot directory spot_dir = os.path.join(_nova_sim_dir, 'robots', 'spot') sys.path.insert(0, spot_dir) from spot_env import SpotEnv sys.path.pop(0) # Use pympc controller if available, otherwise fallback to mpc_gait env_spot = SpotEnv(render_mode="rgb_array", width=RENDER_WIDTH, height=RENDER_HEIGHT, controller_type='pympc') env_spot.scene_name = "scene" env_spot.reset() return env_spot env_spot.scene_name = "scene" return env_spot def init_ur5(scene_name=DEFAULT_SCENES["ur5"]): """Initialize UR5e environment.""" global env_ur5, env_ur5_t_push # Import UR5Env from robots/ur5 directory ur5_dir = os.path.join(_nova_sim_dir, 'robots', 'ur5') sys.path.insert(0, ur5_dir) from ur5_env import UR5Env sys.path.pop(0) # Check if Nova API is configured via environment variables nova_config = None if os.getenv('NOVA_INSTANCE_URL') and os.getenv('NOVA_ACCESS_TOKEN'): # Nova API is configured - attempt bidirectional control # State streaming: Mirror real robot pose while still running physics for objects (T-shape) # Jogging: UI can send directional jog commands to control the real robot # Note: If Nova connection fails, UR5Env will automatically fall back to internal IK print("Nova API credentials detected - attempting bidirectional control") nova_config = { "use_state_stream": True, # Attempt to mirror real robot pose "use_ik": False, # Use jogging API instead of Nova IK (falls back to internal IK if connection fails) "use_jogging": True # Attempt bidirectional robot control } if scene_name == "scene_t_push": if env_ur5_t_push is None: env_ur5_t_push = UR5Env( render_mode="rgb_array", width=RENDER_WIDTH, height=RENDER_HEIGHT, scene_name="scene_t_push", nova_api_config=nova_config ) env_ur5_t_push.reset() return env_ur5_t_push if env_ur5 is None: env_ur5 = UR5Env( render_mode="rgb_array", width=RENDER_WIDTH, height=RENDER_HEIGHT, nova_api_config=nova_config ) env_ur5.reset() return env_ur5 def switch_robot(robot_type, scene_name=None): """Switch to a different robot.""" global env, current_robot, current_scene, renderer, cam active_scene = None if robot_type == current_robot and env is not None: return # Close old renderer if renderer is not None: renderer.close() renderer = None if robot_type == "g1": env = init_g1() cam.lookat = np.array([0, 0, 0.8]) cam.distance = 3.0 elif robot_type == "spot": env = init_spot() cam.lookat = np.array([0, 0, 0.4]) cam.distance = 2.5 elif robot_type == "ur5": selected_scene = scene_name or DEFAULT_SCENES.get("ur5") env = init_ur5(selected_scene) active_scene = selected_scene cam.lookat = np.array([0.3, 0, 0.6]) cam.distance = 1.8 cam.azimuth = 150 cam.elevation = -25 elif robot_type == "ur5_t_push": selected_scene = scene_name or "scene_t_push" env = init_ur5(selected_scene) active_scene = selected_scene cam.lookat = np.array([0.5, 0, 0.55]) cam.distance = 1.9 cam.azimuth = 150 cam.elevation = -25 else: print(f"Unknown robot type: {robot_type}") return current_robot = robot_type current_scene = active_scene env.reset() # Track episode start time for duration watchdog with episode_control_lock: episode_control_state["start_time"] = time.time() episode_control_state["terminate"] = False episode_control_state["truncate"] = False # Initialize gripper value in teleop_action for UR5 with teleop_lock: if robot_type in ("ur5", "ur5_t_push"): has_gripper = getattr(env, "has_gripper", False) if has_gripper: # Get current gripper value from env gripper_val = getattr(env, "get_gripper", lambda: 128)() last_teleop_action["gripper"] = float(gripper_val) else: last_teleop_action["gripper"] = 0.0 else: # Locomotion robots don't have grippers last_teleop_action["gripper"] = 0.0 # Create new renderer renderer = mujoco.Renderer(env.model, height=env.height, width=env.width) prepare_overlay_renderers(robot_type, active_scene) # Initialize with G1 env = init_g1() def _get_scene_objects(): """Extract scene objects (position and orientation) from the environment.""" if env is None: return [] scene_objects = [] # For UR5 scenes, extract T-shape objects if they exist if current_robot in ("ur5", "ur5_t_push"): # Get t_object body if it exists t_object_id = getattr(env, 't_object_body_id', -1) if t_object_id >= 0: pos = env.data.xpos[t_object_id] # MuJoCo stores quaternions in data.xquat as [w, x, y, z] quat = env.data.xquat[t_object_id] scene_objects.append({ 'name': 't_object', 'position': {'x': float(pos[0]), 'y': float(pos[1]), 'z': float(pos[2])}, 'orientation': {'w': float(quat[0]), 'x': float(quat[1]), 'y': float(quat[2]), 'z': float(quat[3])} }) # Get t_target body if it exists t_target_id = getattr(env, 't_target_body_id', -1) if t_target_id >= 0: pos = env.data.xpos[t_target_id] quat = env.data.xquat[t_target_id] scene_objects.append({ 'name': 't_target', 'position': {'x': float(pos[0]), 'y': float(pos[1]), 'z': float(pos[2])}, 'orientation': {'w': float(quat[0]), 'x': float(quat[1]), 'y': float(quat[2]), 'z': float(quat[3])} }) # Get box body if it exists try: box_id = mujoco.mj_name2id(env.model, mujoco.mjtObj.mjOBJ_BODY, "box") if box_id >= 0: pos = env.data.xpos[box_id] quat = env.data.xquat[box_id] scene_objects.append({ 'name': 'box', 'position': {'x': float(pos[0]), 'y': float(pos[1]), 'z': float(pos[2])}, 'orientation': {'w': float(quat[0]), 'x': float(quat[1]), 'y': float(quat[2]), 'z': float(quat[3])} }) except: pass return scene_objects def broadcast_state(): """Broadcast robot state to all connected WebSocket clients.""" with mujoco_lock: if env is None: return obs = env._get_obs() cmd = env.get_command() steps = env.steps command = { "vx": float(cmd[0]) if len(cmd) > 0 else 0.0, "vy": float(cmd[1]) if len(cmd) > 1 else 0.0, "vyaw": float(cmd[2]) if len(cmd) > 2 else 0.0, "vz": 0.0, } with teleop_lock: teleop_snapshot = last_teleop_action.copy() reward_value = None if hasattr(env, "get_task_reward"): reward_value = env.get_task_reward() # Build lists of identified and unidentified clients with client_metadata_lock: identified_clients = [] unidentified_clients = [] for ws, info in list(client_metadata.items()): identity = info.get("identity") if identity: # Client has identified themselves if ws in external_ws_clients: identified_clients.append(identity) else: # Client connected but hasn't sent identity yet unidentified_clients.append(f"unidentified-{id(ws) % 10000}") # Get scene objects scene_objects = _get_scene_objects() # UR5 has different state structure if current_robot in ("ur5", "ur5_t_push"): ee_pos = env.get_end_effector_pos() ee_quat = env.get_end_effector_quat() target = env.get_target() target_euler = env.get_target_orientation() gripper = env.get_gripper() joint_pos = env.get_joint_positions() joint_targets = env.get_joint_targets() control_mode = env.get_control_mode() # Check Nova API status nova_client = getattr(env, '_nova_client', None) nova_state_streaming = getattr(env, '_use_nova_state_stream', False) nova_ik = getattr(env, '_use_nova_ik', False) # Check if actually connected (WebSocket active and receiving data) nova_connected = False if nova_client is not None: if hasattr(nova_client, 'is_state_stream_connected'): # Check actual stream connection status nova_connected = nova_client.is_state_stream_connected() elif hasattr(nova_client, 'is_connected'): # Fallback: check general connection status nova_connected = nova_client.is_connected() else: # If client exists but we can't verify connection, assume connected # This handles the IK-only case where state streaming isn't used nova_connected = True nova_available = False nova_enabled_pref = False has_nova_conf = getattr(env, 'has_nova_configuration', None) if callable(has_nova_conf): nova_available = has_nova_conf() is_nova_enabled = getattr(env, 'is_nova_enabled', None) if callable(is_nova_enabled): nova_enabled_pref = is_nova_enabled() state_msg = json.dumps({ 'type': 'state', 'data': { 'robot': current_robot, 'scene': current_scene, 'observation': { 'end_effector': {'x': float(ee_pos[0]), 'y': float(ee_pos[1]), 'z': float(ee_pos[2])}, 'ee_orientation': {'w': float(ee_quat[0]), 'x': float(ee_quat[1]), 'y': float(ee_quat[2]), 'z': float(ee_quat[3])}, 'ee_target': {'x': float(target[0]), 'y': float(target[1]), 'z': float(target[2])}, 'ee_target_orientation': {'roll': float(target_euler[0]), 'pitch': float(target_euler[1]), 'yaw': float(target_euler[2])}, 'gripper': float(gripper), 'joint_positions': [float(j) for j in joint_pos], 'joint_targets': [float(j) for j in joint_targets], }, 'control_mode': control_mode, 'steps': int(steps), 'reward': reward_value, 'teleop_action': teleop_snapshot, 'scene_objects': scene_objects, 'nova_api': { 'connected': nova_connected, 'state_streaming': nova_state_streaming, 'ik': nova_ik, 'available': nova_available, 'enabled': nova_enabled_pref }, 'connected_clients': identified_clients, 'unidentified_clients': unidentified_clients } }) else: # Locomotion robots (G1, Spot) base_pos = obs[0:3] # x, y, z position base_quat = obs[3:7] # quaternion orientation (w, x, y, z) state_msg = json.dumps({ 'type': 'state', 'data': { 'robot': current_robot, 'scene': current_scene, 'observation': { 'position': {'x': float(base_pos[0]), 'y': float(base_pos[1]), 'z': float(base_pos[2])}, 'orientation': {'w': float(base_quat[0]), 'x': float(base_quat[1]), 'y': float(base_quat[2]), 'z': float(base_quat[3])}, }, 'steps': int(steps), 'reward': reward_value, 'teleop_action': teleop_snapshot, 'scene_objects': scene_objects, 'connected_clients': identified_clients, 'unidentified_clients': unidentified_clients } }) # Send to all connected clients with ws_clients_lock: dead_clients = set() for ws in ws_clients: try: ws.send(state_msg) except Exception: dead_clients.add(ws) # Remove dead clients ws_clients.difference_update(dead_clients) def broadcast_camera_event(action: str, camera_payload: dict[str, Any], scope: dict[str, Any]) -> None: """Announce camera changes via the state stream.""" message = { "type": "state", "data": { "camera_event": { "action": action, "camera": camera_payload, "scope": scope, "timestamp": time.time(), } } } with ws_clients_lock: dead_clients = set() for ws in ws_clients: try: ws.send(json.dumps(message)) except Exception: dead_clients.add(ws) ws_clients.difference_update(dead_clients) def _handle_external_client_message(ws, data): """Process message payloads originating from external clients.""" msg_type = data.get("type") if msg_type in ("client_identity", "trainer_identity"): # Support old name for backward compat payload = data.get("data", {}) or {} identity = payload.get("client_id") or payload.get("trainer_id") or payload.get("name") or "client" with external_ws_clients_lock: external_ws_clients.add(ws) # Don't call _register_external_client again - client is already registered on connection _set_client_identity(ws, identity) broadcast_state() broadcast_connected_clients_status() return if msg_type in ("client_notification", "notification"): # Support old name for backward compat payload = data.get("data", {}) payload.setdefault("timestamp", time.time()) broadcast_notification_to_all(payload) def _build_connected_clients_payload(): """Build a summary payload describing connected external clients.""" with client_metadata_lock: identified = [] unidentified = [] for ws, info in client_metadata.items(): identity = info.get("identity") if identity: # Client has identified themselves if ws in external_ws_clients: identified.append(identity) else: # Client connected but hasn't sent identity yet unidentified.append(f"unidentified-{id(ws) % 10000}") payload = { "clients": identified, "unidentified_clients": unidentified, } return payload def _register_external_client(ws): with client_metadata_lock: client_metadata[ws] = {"identity": None, "connected_at": time.time()} def _unregister_external_client(ws): with client_metadata_lock: client_metadata.pop(ws, None) def _set_client_identity(ws, identity: str): with client_metadata_lock: info = client_metadata.get(ws) if not info: # Client wasn't registered on connection (shouldn't happen with new flow) # Register them now for backward compatibility client_metadata[ws] = {"identity": identity, "connected_at": time.time(), "last_seen": time.time()} return info["identity"] = identity info["last_seen"] = time.time() def broadcast_connected_clients_status(): """Notify all clients about connected external clients.""" payload = _build_connected_clients_payload() message = json.dumps({"type": "connected_clients", "data": payload}) with ws_clients_lock: dead_clients = set() for client in ws_clients: try: client.send(message) except: dead_clients.add(client) ws_clients.difference_update(dead_clients) def broadcast_to_external_clients(event_type: str, payload: dict): """Broadcast a structured event to connected external WebSocket clients.""" message = json.dumps({"type": event_type, "data": payload}) with external_ws_clients_lock: dead_clients = set() for ws in external_ws_clients: try: ws.send(message) except Exception: dead_clients.add(ws) external_ws_clients.difference_update(dead_clients) def broadcast_notification_to_all(payload: dict): """Send a notification message (from external client) to all clients.""" notification = json.dumps({"type": "client_notification", "data": payload}) with ws_clients_lock: dead_clients = set() for ws in ws_clients: try: ws.send(notification) except Exception: dead_clients.add(ws) ws_clients.difference_update(dead_clients) def _safe_ws_send(ws, message: dict): """Send JSON message over WebSocket without raising.""" try: ws.send(json.dumps(message)) except Exception: pass def _signal_episode_control(action: str): """Set episode control flags and notify trainer/UI clients.""" action = (action or "").lower() if action not in ("terminate", "truncate"): return with episode_control_lock: episode_control_state[action] = True episode_control_state["start_time"] = 0.0 # Reset timer status_label = "terminated" if action == "terminate" else "truncated" payload = { "action": action, "status": status_label, "source": "ui", "timestamp": time.time(), } broadcast_to_external_clients("episode_event", payload) # Reset UI environment so the user can continue without manual reset with mujoco_lock: if env is not None: env.reset() # Track new episode start time with episode_control_lock: episode_control_state["start_time"] = time.time() episode_control_state["terminate"] = False episode_control_state["truncate"] = False def _consume_episode_control_flags(): """Return and clear the active episode control flags.""" with episode_control_lock: terminate = episode_control_state.get("terminate", False) truncate = episode_control_state.get("truncate", False) episode_control_state["terminate"] = False episode_control_state["truncate"] = False return terminate, truncate def _quat_to_mat(quat): """Convert quaternion (w, x, y, z) to 3x3 rotation matrix. Args: quat: dict with keys 'w', 'x', 'y', 'z' or tuple/list [w, x, y, z] Returns: 3x3 rotation matrix as numpy array """ if isinstance(quat, dict): w, x, y, z = quat['w'], quat['x'], quat['y'], quat['z'] else: w, x, y, z = quat # Normalize quaternion norm = math.sqrt(w*w + x*x + y*y + z*z) if norm < 1e-10: return np.eye(3) w, x, y, z = w/norm, x/norm, y/norm, z/norm # Convert to rotation matrix mat = np.array([ [1 - 2*(y*y + z*z), 2*(x*y - w*z), 2*(x*z + w*y)], [ 2*(x*y + w*z), 1 - 2*(x*x + z*z), 2*(y*z - w*x)], [ 2*(x*z - w*y), 2*(y*z + w*x), 1 - 2*(x*x + y*y)] ]) return mat def _render_perception_frames(renderer_obj, model, data): """Render bounding boxes for detected object poses. Draws wireframe boxes at detected poses, color-coded by confidence. Green (high confidence >0.8), Yellow (medium 0.5-0.8), Red (low <0.5). """ with perception_lock: poses = perception_state.get("poses", []) last_update = perception_state.get("last_update", 0) # Don't render if data is too old (> 1 second) if time.time() - last_update > 1.0: return # Render bounding box for each detected pose for pose in poses: try: pos = pose.get("position", {}) orientation = pose.get("orientation", {}) confidence = pose.get("confidence", 0.0) dimensions = pose.get("dimensions") # Skip if no dimensions provided if not dimensions: continue # Extract position px = float(pos.get("x", 0.0)) py = float(pos.get("y", 0.0)) pz = float(pos.get("z", 0.0)) # Extract dimensions if isinstance(dimensions, dict): width = float(dimensions.get("width", 0.05)) height = float(dimensions.get("height", 0.05)) depth = float(dimensions.get("depth", 0.05)) elif isinstance(dimensions, (list, tuple)): width, height, depth = float(dimensions[0]), float(dimensions[1]), float(dimensions[2]) else: continue # Color based on confidence: high=green, medium=yellow, low=red if confidence > 0.8: rgba = [0.0, 1.0, 0.0, 0.5] # Green elif confidence > 0.5: rgba = [1.0, 1.0, 0.0, 0.5] # Yellow else: rgba = [1.0, 0.0, 0.0, 0.5] # Red # Convert quaternion to rotation matrix rot_mat = _quat_to_mat(orientation) # Render as box geometry # MuJoCo box size is half-extents, so divide by 2 mujoco.mjv_initGeom( renderer_obj.scene.geoms[renderer_obj.scene.ngeom], type=mujoco.mjtGeom.mjGEOM_BOX, size=[width/2, height/2, depth/2], pos=[px, py, pz], mat=rot_mat.flatten(), rgba=rgba ) renderer_obj.scene.ngeom += 1 # Limit to max geoms to avoid overflow if renderer_obj.scene.ngeom >= renderer_obj.scene.maxgeom - 10: break except Exception as e: # Silently skip invalid poses pass def simulation_loop(): global running, latest_frame, camera_follow, t_target_visible, renderer, needs_robot_switch, env print("Starting simulation loop...") # Initialize renderer renderer = mujoco.Renderer(env.model, height=env.height, width=env.width) physics_steps_per_frame = 5 broadcast_counter = 0 def _run_iteration(): nonlocal broadcast_counter global needs_robot_switch, latest_frame, jogging_active, last_teleop_command_time if needs_robot_switch is not None: with mujoco_lock: switch_robot( needs_robot_switch.get("robot"), scene_name=needs_robot_switch.get("scene") ) needs_robot_switch = None # Check episode duration watchdog current_time = time.time() with episode_control_lock: episode_start = episode_control_state.get("start_time", 0.0) max_duration = episode_control_state.get("max_duration", 120.0) if episode_start > 0 and (current_time - episode_start) > max_duration: # Episode has exceeded max duration - automatically truncate print(f"[Watchdog] Episode exceeded max duration ({max_duration}s) - truncating episode") episode_control_state["truncate"] = True episode_control_state["start_time"] = 0.0 # Reset timer # Stop any active jogging if jogging_active: with mujoco_lock: if env is not None and current_robot in ("ur5", "ur5_t_push"): stop_jog_fn = getattr(env, "stop_jog", None) if callable(stop_jog_fn): try: stop_jog_fn() jogging_active = False except Exception as e: print(f"[Watchdog] Error stopping jogging during truncation: {e}") with mujoco_lock: if env is None: return sim_dt = env.model.opt.timestep # Run physics steps for _ in range(physics_steps_per_frame): env.step_with_controller(dt=sim_dt) # Update camera to follow robot or object if current_robot in ("ur5", "ur5_t_push"): # For UR5: follow t_object when enabled, table center when disabled if camera_follow: # Follow t_object if it exists t_object_id = getattr(env, 't_object_body_id', -1) if t_object_id >= 0: object_pos = env.data.xpos[t_object_id] cam.lookat[0] = object_pos[0] cam.lookat[1] = object_pos[1] cam.lookat[2] = object_pos[2] else: # Center on table cam.lookat[0] = 0.5 cam.lookat[1] = 0.0 cam.lookat[2] = 0.42 elif camera_follow: # For locomotion robots: follow robot base robot_pos = env.data.qpos[:3] cam.lookat[0] = robot_pos[0] cam.lookat[1] = robot_pos[1] cam.lookat[2] = 0.8 if current_robot == "g1" else 0.4 # Render if renderer is not None: renderer.update_scene(env.data, camera=cam) # Hide t_target if visibility is disabled (push-T scene only) if current_robot == "ur5_t_push" and not t_target_visible: t_target_id = getattr(env, 't_target_body_id', -1) if t_target_id >= 0: # Set alpha to 0 for all geoms belonging to t_target body for i in range(renderer.scene.ngeom): geom = renderer.scene.geoms[i] # Check if this geom belongs to t_target body by comparing position target_pos = env.data.xpos[t_target_id] geom_pos = geom.pos # If geom is very close to target body, it likely belongs to it dist = np.linalg.norm(np.array(geom_pos) - target_pos) if dist < 0.15: # Within 15cm - likely part of the T-shape geom.rgba[3] = 0.0 # Set alpha to 0 (invisible) # Add perception visualization markers _render_perception_frames(renderer, env.model, env.data) frame = renderer.render() else: return # Encode frame frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) ret, buffer = cv2.imencode('.jpg', frame_bgr) if ret: with frame_lock: latest_frame = buffer.tobytes() # Render overlay camera frames (if configured) with overlay_camera_lock: overlay_snapshot = list(overlay_camera_states.items()) for name, state in overlay_snapshot: renderer_obj = state.get("renderer") cam_obj = state.get("camera") if renderer_obj is None or cam_obj is None: if cam_obj is None: continue with mujoco_lock: if env is None: continue try: renderer_obj = mujoco.Renderer( env.model, height=OVERLAY_RENDER_HEIGHT, width=OVERLAY_RENDER_WIDTH ) except Exception: continue with overlay_camera_lock: if name in overlay_camera_states: overlay_camera_states[name]["renderer"] = renderer_obj if renderer_obj is None: continue if state.get("follow_site") and env is not None: site_id = state.get("site_id", -1) if site_id >= 0 and site_id < getattr(env.model, "nsite", 0): site_xpos = getattr(env.data, "site_xpos", None) if site_xpos is not None and site_xpos.size >= (site_id + 1) * 3: lookat_point = site_xpos[site_id] else: continue offset = state.get("offset") if offset is not None: lookat_point = lookat_point + offset forward_offset = state.get("forward_offset", 0.0) if forward_offset: forward = _get_site_forward(env, site_id) lookat_point = lookat_point + forward * forward_offset look_target_id = state.get("look_target_id", -1) target_point = None if look_target_id >= 0 and site_xpos.size >= (look_target_id + 1) * 3: target_point = site_xpos[look_target_id] cam_obj.lookat = target_point if target_point is not None else lookat_point if state.get("track_orientation"): forward = _get_site_forward(env, site_id) yaw = math.degrees(math.atan2(forward[1], forward[0])) pitch = math.degrees(math.asin(np.clip(forward[2], -1.0, 1.0))) cam_obj.azimuth = -yaw + 90.0 cam_obj.elevation = -pitch renderer_obj.update_scene(env.data, camera=cam_obj) # Hide t_target if visibility is disabled (push-T scene only) if current_robot == "ur5_t_push" and not t_target_visible: t_target_id = getattr(env, 't_target_body_id', -1) if t_target_id >= 0: # Set alpha to 0 for all geoms belonging to t_target body for i in range(renderer_obj.scene.ngeom): geom = renderer_obj.scene.geoms[i] # Check if this geom belongs to t_target body by comparing position target_pos = env.data.xpos[t_target_id] geom_pos = geom.pos # If geom is very close to target body, it likely belongs to it dist = np.linalg.norm(np.array(geom_pos) - target_pos) if dist < 0.15: # Within 15cm - likely part of the T-shape geom.rgba[3] = 0.0 # Set alpha to 0 (invisible) overlay_frame = renderer_obj.render() if overlay_frame is None: continue overlay_bgr = cv2.cvtColor(overlay_frame, cv2.COLOR_RGB2BGR) ret2, buffer2 = cv2.imencode('.jpg', overlay_bgr) if ret2: with overlay_frame_lock: overlay_frames[name] = buffer2.tobytes() # Broadcast state broadcast_counter += 1 if broadcast_counter >= 6: broadcast_counter = 0 broadcast_state() while running: loop_start = time.time() try: _run_iteration() except Exception as e: print(f"Error in simulation loop: {e}") import traceback traceback.print_exc() elapsed = time.time() - loop_start wait = max(0.001, 0.016 - elapsed) time.sleep(wait) if renderer: renderer.close() print("Simulation loop stopped.") # Start simulation thread sim_thread = threading.Thread(target=simulation_loop, daemon=True) sim_thread.start() def generate_frames(): while True: with frame_lock: if latest_frame is not None: yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + latest_frame + b'\r\n') time.sleep(0.04) def generate_overlay_frames(name: str): while True: with overlay_frame_lock: frame = overlay_frames.get(name) if frame is not None: yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') time.sleep(0.04) def _set_jogging_paused(paused: bool) -> bool: """Pause or resume jogging commands; returns the previous pause state.""" global jogging_paused with jogging_pause_lock: previous = jogging_paused jogging_paused = paused return previous def _is_jogging_paused() -> bool: with jogging_pause_lock: return jogging_paused def _step_multi_axis_homing_locked(tolerance: float = 0.01) -> dict: """Run one homing control step (expects mujoco_lock + homing_lock held).""" homing_state["last_home_time"] = time.time() if env is None or current_robot not in ("ur5", "ur5_t_push"): return {"status": "unsupported"} home_pose = getattr(env, "home_pose", None) get_joints = getattr(env, "get_joint_positions", None) start_multi_joint_jog = getattr(env, "start_multi_joint_jog", None) stop_jog = getattr(env, "stop_jog", None) if home_pose is None or not callable(get_joints) or not callable(start_multi_joint_jog) or not callable(stop_jog): return {"status": "unsupported"} if hasattr(home_pose, "tolist"): target_positions = home_pose.tolist() else: target_positions = list(home_pose) current_positions = get_joints() # Calculate errors for all joints errors = [] for joint_idx in range(min(len(target_positions), len(current_positions))): current = current_positions[joint_idx] target = target_positions[joint_idx] error = target - current errors.append(error) max_abs_error = max(abs(e) for e in errors) if errors else 0.0 if max_abs_error < tolerance: if homing_state.get("is_jogging", False): stop_jog() homing_state["is_jogging"] = False homing_state["active"] = False homing_state.pop("previous_errors", None) homing_state.pop("current_velocities", None) print("[Homing] Completed - all joints at target") return { "status": "done", "errors": errors, "max_abs_error": max_abs_error, } if "previous_errors" not in homing_state: homing_state["previous_errors"] = [0.0] * len(errors) base_velocity = 0.15 # rad/s - slower to prevent overshoot velocities = [] stop_tolerance = tolerance * 0.5 for joint_idx, error in enumerate(errors): abs_error = abs(error) prev_error = homing_state["previous_errors"][joint_idx] if prev_error != 0 and ((prev_error > 0 and error < 0) or (prev_error < 0 and error > 0)): velocities.append(0.0) continue if abs_error < stop_tolerance: velocities.append(0.0) else: if max_abs_error > 0: velocity = base_velocity * (abs_error / max_abs_error) else: velocity = 0.0 damping_zone = tolerance * 3.0 if abs_error < damping_zone: damping_factor = (abs_error / damping_zone) ** 2 velocity *= damping_factor min_velocity = 0.02 if velocity < min_velocity: velocities.append(0.0) else: velocities.append(velocity if error > 0 else -velocity) homing_state["previous_errors"] = errors if all(abs(v) < 0.001 for v in velocities): if homing_state.get("is_jogging", False): stop_jog() homing_state["is_jogging"] = False homing_state["active"] = False homing_state.pop("previous_errors", None) homing_state.pop("current_velocities", None) print("[Homing] All joints reached target or stopped") return { "status": "done", "errors": errors, "max_abs_error": max_abs_error, } current_velocities = homing_state.get("current_velocities") velocities_changed = current_velocities is None or any( abs(v1 - v2) > 0.01 for v1, v2 in zip(velocities, current_velocities) ) if velocities_changed: if homing_state.get("is_jogging", False): stop_jog() start_multi_joint_jog(velocities) homing_state["is_jogging"] = True homing_state["current_velocities"] = velocities homing_state["active"] = True return { "status": "running", "errors": errors, "max_abs_error": max_abs_error, "velocities": velocities, } def handle_ws_message(ws, data): """Handle incoming WebSocket message.""" global needs_robot_switch, camera_follow, t_target_visible, last_teleop_action, last_teleop_command_time, jogging_active msg_type = data.get('type') if msg_type in ("client_identity", "trainer_identity", "client_notification", "notification"): _handle_external_client_message(ws, data) return # Backward compatibility: map old message types to new ones if msg_type == 'command': msg_type = 'action' data['type'] = 'action' elif msg_type == 'teleop_command': msg_type = 'teleop_action' data['type'] = 'teleop_action' if msg_type == 'action': payload = data.get('data', {}) vx = payload.get('vx', 0.0) vy = payload.get('vy', 0.0) vz = payload.get('vz', 0.0) vyaw = payload.get('vyaw', 0.0) vrx = payload.get('vrx', 0.0) vry = payload.get('vry', 0.0) vrz = payload.get('vrz', 0.0) j1 = payload.get('j1', 0.0) j2 = payload.get('j2', 0.0) j3 = payload.get('j3', 0.0) j4 = payload.get('j4', 0.0) j5 = payload.get('j5', 0.0) j6 = payload.get('j6', 0.0) gripper = payload.get('gripper', None) if current_robot in ("ur5", "ur5_t_push") and _is_jogging_paused(): if gripper is not None: with mujoco_lock: if env is not None: env.set_gripper(float(gripper)) return with mujoco_lock: if env is not None: # For UR5: translate velocity actions to jogging commands if current_robot in ("ur5", "ur5_t_push"): # Handle gripper if specified if gripper is not None: env.set_gripper(float(gripper)) # Check which type of velocity is active (joint vs cartesian) joint_velocities = [float(j1), float(j2), float(j3), float(j4), float(j5), float(j6)] cartesian_translation = [float(vx), float(vy), float(vz)] cartesian_rotation = [float(vrx), float(vry), float(vrz)] any_joint = any(abs(v) > 0.001 for v in joint_velocities) any_cartesian = any(abs(v) > 0.001 for v in cartesian_translation + cartesian_rotation) # Update teleop command timestamp last_teleop_command_time = time.time() # Apply the appropriate jogging mode if any_joint: env.start_multi_joint_jog(joint_velocities) jogging_active = True elif any_cartesian: # Convert m/s to mm/s for translations translation_mm_s = [v * 1000.0 for v in cartesian_translation] env.start_jog( 'cartesian_velocity', translation=translation_mm_s, rotation=cartesian_rotation, tcp_id='Flange', coord_system_id='world' ) jogging_active = True else: # No active velocity - stop jogging env.stop_jog() jogging_active = False else: # For locomotion robots: use vx, vy, vyaw env.set_command(vx, vy, vyaw) with teleop_lock: # Update all velocity fields in teleop_action last_teleop_action["vx"] = float(vx) last_teleop_action["vy"] = float(vy) last_teleop_action["vz"] = float(vz) last_teleop_action["vyaw"] = float(vyaw) last_teleop_action["vrx"] = float(vrx) last_teleop_action["vry"] = float(vry) last_teleop_action["vrz"] = float(vrz) last_teleop_action["j1"] = float(j1) last_teleop_action["j2"] = float(j2) last_teleop_action["j3"] = float(j3) last_teleop_action["j4"] = float(j4) last_teleop_action["j5"] = float(j5) last_teleop_action["j6"] = float(j6) if gripper is not None: last_teleop_action["gripper"] = float(gripper) elif msg_type == 'reset': payload = data.get('data', {}) seed = payload.get('seed') with mujoco_lock: if env is not None: obs, info = env.reset(seed=seed) # Track episode start time for duration watchdog with episode_control_lock: episode_control_state["start_time"] = time.time() episode_control_state["terminate"] = False episode_control_state["truncate"] = False elif msg_type == 'switch_robot': payload = data.get('data', {}) robot = payload.get('robot', 'g1') scene = payload.get('scene') _schedule_robot_switch(robot, scene) elif msg_type == 'camera': payload = data.get('data', {}) a = cam.azimuth * np.pi / 180.0 e = cam.elevation * np.pi / 180.0 cam_type = payload.get('action') if cam_type == 'rotate': cam.azimuth -= payload.get('dx', 0) * 0.5 cam.elevation -= payload.get('dy', 0) * 0.5 cam.elevation = np.clip(cam.elevation, -89, 89) elif cam_type == 'zoom': cam.distance += payload.get('dz', 0) * 0.01 cam.distance = max(0.5, min(20.0, cam.distance)) elif cam_type == 'set_distance': cam.distance = payload.get('distance', 3.0) elif cam_type == 'pan': right = np.array([np.sin(a), -np.cos(a), 0]) up = np.array([-np.cos(a) * np.sin(e), -np.sin(a) * np.sin(e), np.cos(e)]) scale = cam.distance * 0.002 cam.lookat -= (payload.get('dx', 0) * right - payload.get('dy', 0) * up) * scale elif msg_type == 'camera_follow': payload = data.get('data', {}) camera_follow = payload.get('follow', True) elif msg_type == 'toggle_target_visibility': payload = data.get('data', {}) t_target_visible = payload.get('visible', True) elif msg_type == 'teleop_action': payload = data.get('data', {}) # Accept both old format (dx/dy/dz) and new format (vx/vy/vz) vx = float(payload.get('vx', payload.get('dx', 0.0))) vy = float(payload.get('vy', payload.get('dy', 0.0))) vz = float(payload.get('vz', payload.get('dz', 0.0))) timestamp = time.time() if current_robot in ("ur5", "ur5_t_push") and _is_jogging_paused(): return with mujoco_lock: if env is not None and current_robot in ("ur5", "ur5_t_push"): # Check if all velocities are zero - if so, stop any active jogging if abs(vx) < 0.001 and abs(vy) < 0.001 and abs(vz) < 0.001: stop_jog = getattr(env, "stop_jog", None) if callable(stop_jog): stop_jog() else: # Apply teleop movement target = env.get_target() env.set_target( target[0] + vx, target[1] + vy, target[2] + vz, update_joint_targets=True ) updated_target = env.get_target() else: updated_target = None with teleop_lock: # Update UR5 Cartesian velocities, preserve other fields last_teleop_action["vx"] = vx last_teleop_action["vy"] = vy last_teleop_action["vz"] = vz last_teleop_action["vyaw"] = 0.0 # Clear jogging velocities when using teleop (jogging and teleop are mutually exclusive) for j in range(1, 7): last_teleop_action[f"j{j}"] = 0.0 last_teleop_action["vrx"] = 0.0 last_teleop_action["vry"] = 0.0 last_teleop_action["vrz"] = 0.0 broadcast_to_external_clients( "action_update", { "robot": current_robot, "scene": getattr(env, "scene_name", None) if env is not None else None, "delta": {"vx": vx, "vy": vy, "vz": vz}, "ee_target": updated_target.tolist() if updated_target is not None else None, "timestamp": timestamp, } ) elif msg_type == 'episode_control': payload = data.get('data', {}) action = payload.get('action') _signal_episode_control(action) # UR5-specific messages elif msg_type == 'arm_target': payload = data.get('data', {}) x = payload.get('x', 0.4) y = payload.get('y', 0.0) z = payload.get('z', 0.6) with mujoco_lock: if env is not None and current_robot in ("ur5", "ur5_t_push"): env.set_target(x, y, z) elif msg_type == 'gripper': payload = data.get('data', {}) action = payload.get('action', 'open') # Robotiq 2F-85: 0 = open, 255 = closed (official MuJoCo Menagerie) if action == 'open': value = 0 # 0 = fully open elif action == 'close': value = 255 # 255 = fully closed else: value = payload.get('value', 128) with mujoco_lock: if env is not None and current_robot in ("ur5", "ur5_t_push"): env.set_gripper(value) # Update teleop_action with gripper value with teleop_lock: last_teleop_action["gripper"] = float(value) elif msg_type == 'control_mode': payload = data.get('data', {}) mode = payload.get('mode', 'ik') with mujoco_lock: if env is not None and current_robot in ("ur5", "ur5_t_push"): env.set_control_mode(mode) elif msg_type == 'joint_positions': payload = data.get('data', {}) positions = payload.get('positions', []) if len(positions) == 6: with mujoco_lock: if env is not None and current_robot in ("ur5", "ur5_t_push"): env.set_joint_positions(positions) elif msg_type == 'arm_orientation': payload = data.get('data', {}) roll = payload.get('roll', 0.0) pitch = payload.get('pitch', np.pi/2) yaw = payload.get('yaw', 0.0) with mujoco_lock: if env is not None and current_robot in ("ur5", "ur5_t_push"): env.set_target_orientation(roll, pitch, yaw) elif msg_type == 'use_orientation': payload = data.get('data', {}) use = payload.get('enabled', True) with mujoco_lock: if env is not None and current_robot in ("ur5", "ur5_t_push"): env.set_use_orientation(use) elif msg_type == 'perception_update': # Handle perception updates from training system payload = data.get('data', {}) poses = payload.get('poses', []) timestamp = payload.get('timestamp', time.time()) with perception_lock: perception_state["poses"] = poses perception_state["last_update"] = timestamp # print(f"[Perception] Received {len(poses)} pose(s) at {timestamp:.2f}") elif msg_type == 'set_nova_mode': payload = data.get('data', {}) enabled = bool(payload.get('enabled', False)) with mujoco_lock: if env is not None and current_robot in ("ur5", "ur5_t_push") and hasattr(env, 'set_nova_enabled'): env.set_nova_enabled(enabled) # Nova jogging commands elif msg_type == 'homing': with mujoco_lock: if env is not None: home_pose = getattr(env, "home_pose", None) set_joints = getattr(env, "set_joint_positions", None) if home_pose is not None and callable(set_joints): if hasattr(home_pose, "tolist"): positions = home_pose.tolist() else: positions = list(home_pose) set_joints(positions) def _serialize_value(value): """Recursively convert numpy data to JSON-friendly types.""" if isinstance(value, np.ndarray): return _serialize_value(value.tolist()) if isinstance(value, dict): return {str(k): _serialize_value(v) for k, v in value.items()} if isinstance(value, (list, tuple)): return [_serialize_value(v) for v in value] if isinstance(value, (np.integer,)): return int(value) if isinstance(value, (np.floating,)): return float(value) return value def _serialize_space(space): if hasattr(space, "low") and hasattr(space, "high"): import math # Convert numpy arrays to lists, replacing inf with null for valid JSON low_list = space.low.tolist() high_list = space.high.tolist() # Replace infinity values with null (JSON doesn't support Infinity) low_json = [None if math.isinf(v) else v for v in low_list] high_json = [None if math.isinf(v) else v for v in high_list] return { "type": "box", "shape": list(space.shape), "low": low_json, "high": high_json, "dtype": str(space.dtype), } return {"type": "unknown"} def _resolve_robot_scene(robot, scene): if robot == "ur5_t_push" and scene is None: return "ur5", "scene_t_push" if robot == "ur5" and scene is None: return "ur5", "scene" return robot, scene def _create_env(robot, scene): if robot == "g1": return G1Env(render_mode="rgb_array", width=RENDER_WIDTH, height=RENDER_HEIGHT) if robot == "spot": spot_dir = os.path.join(_nova_sim_dir, 'robots', 'spot') sys.path.insert(0, spot_dir) from spot_env import SpotEnv sys.path.pop(0) return SpotEnv(render_mode="rgb_array", width=RENDER_WIDTH, height=RENDER_HEIGHT, controller_type='pympc') if robot == "ur5": ur5_dir = os.path.join(_nova_sim_dir, 'robots', 'ur5') sys.path.insert(0, ur5_dir) from ur5_env import UR5Env sys.path.pop(0) # scene is already resolved by _resolve_robot_scene (e.g., "scene_t_push") if scene: return UR5Env(render_mode="rgb_array", width=RENDER_WIDTH, height=RENDER_HEIGHT, scene_name=scene) return UR5Env(render_mode="rgb_array", width=RENDER_WIDTH, height=RENDER_HEIGHT) raise ValueError(f"Unsupported robot for gym: {robot}") @sock.route(f'{API_PREFIX}/ws') def websocket_handler(ws): """Handle WebSocket connections.""" print('WebSocket client connected', flush=True) # Register client (track all clients, even before they identify) with ws_clients_lock: ws_clients.add(ws) # Register in metadata as unidentified _register_external_client(ws) broadcast_connected_clients_status() try: while True: message = ws.receive() if message is None: break try: data = json.loads(message) handle_ws_message(ws, data) except json.JSONDecodeError: print(f"Invalid JSON received: {message}") except Exception as e: print(f"Error handling message: {e}") except: pass finally: # Unregister client with ws_clients_lock: ws_clients.discard(ws) with external_ws_clients_lock: was_external = ws in external_ws_clients external_ws_clients.discard(ws) # Always unregister from metadata (all clients are now tracked, not just external ones) _unregister_external_client(ws) broadcast_state() broadcast_connected_clients_status() print('WebSocket client disconnected') # Serve UI at /nova-sim (no redirect) @app.route('/nova-sim') @app.route('/nova-sim/') def nova_sim_ui(): return index() # Redirect root to /nova-sim @app.route('/') def root_redirect(): return render_template_string("""

Redirecting to /nova-sim...

""") @app.route(API_PREFIX) @app.route(f'{API_PREFIX}/') def index(): # Load HTML from frontend/index.html html_path = Path(_nova_sim_dir) / 'frontend' / 'index.html' try: with open(html_path, 'r', encoding='utf-8') as f: html_template = f.read() except FileNotFoundError: return "Error: frontend/index.html not found", 500 # Replace template variables (Python string concatenation syntax from old inline HTML) html_content = html_template.replace('""" + API_PREFIX + """', API_PREFIX) response = make_response(html_content) # Prevent browser caching of the HTML/JavaScript response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = '0' return response @app.route(f'{API_PREFIX}/video_feed') def video_feed(): return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route(f'{API_PREFIX}/cameras', methods=['POST']) def add_camera(): """Add an overlay camera for the current robot/scene.""" payload = request.get_json(silent=True) or {} replace_existing = bool(payload.get("replace", False)) try: config = _build_dynamic_camera_config(payload) except ValueError as exc: return jsonify({"error": str(exc)}), 400 with mujoco_lock: if env is None: return jsonify({"error": "no environment initialized"}), 503 scene_name = getattr(env, "scene_name", None) or current_scene robot_type = current_robot # Validate site references if provided follow_site = config.get("follow_site") look_target_site = config.get("look_target_site") if follow_site: site_id = mujoco.mj_name2id(env.model, mujoco.mjtObj.mjOBJ_SITE, follow_site) if site_id < 0: return jsonify({"error": f"follow_site '{follow_site}' not found"}), 400 if look_target_site: target_id = mujoco.mj_name2id(env.model, mujoco.mjtObj.mjOBJ_SITE, look_target_site) if target_id < 0: return jsonify({"error": f"look_target_site '{look_target_site}' not found"}), 400 temp_cam = _make_mjv_camera(config) feed_intrinsics = _camera_intrinsics( OVERLAY_RENDER_WIDTH, OVERLAY_RENDER_HEIGHT, _camera_fovy_degrees(temp_cam or cam, env), ) camera_payload = { "name": config["name"], "label": config.get("label", config["name"]), "url": f"{API_PREFIX}/camera/{config['name']}/video_feed", "pose": _camera_pose(temp_cam), "intrinsics": feed_intrinsics, } with overlay_camera_lock: exists = config["name"] in overlay_camera_states if exists and not replace_existing: return jsonify({"error": "camera already exists", "name": config["name"]}), 409 _store_dynamic_camera_config(robot_type, scene_name, config) state_payload = _build_overlay_camera_state(config, create_renderer=False) # Store reference to old renderer to close after state update (prevents race condition) old_renderer = None with overlay_camera_lock: if config["name"] in overlay_camera_states: old_renderer = overlay_camera_states[config["name"]].get("renderer") # Update state first before closing old renderer to prevent use-after-free overlay_camera_states[config["name"]] = state_payload with overlay_frame_lock: overlay_frames[config["name"]] = None # Close old renderer after state is updated (prevents simulation loop from accessing freed renderer) if old_renderer: try: old_renderer.close() except Exception: pass broadcast_camera_event( "added", camera_payload, {"robot": robot_type, "scene": scene_name}, ) return jsonify({ "status": "ok", "camera": camera_payload, "scope": {"robot": robot_type, "scene": scene_name}, }) @app.route(f'{API_PREFIX}/camera//video_feed') def camera_feed(name): if name == "main": return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') with overlay_frame_lock: if name not in overlay_frames: return jsonify({"error": "Camera feed not available"}), 404 return Response(generate_overlay_frames(name), mimetype='multipart/x-mixed-replace; boundary=frame') def capture_depth_snapshot(camera_name: str) -> Optional[bytes]: """Capture a depth image from the specified camera and return as PNG bytes. Args: camera_name: Name of the camera ("main" or overlay camera name) Returns: PNG-encoded depth image as bytes, or None if capture fails """ global env, cam, overlay_camera_states if env is None: return None # Determine which camera to use camera_obj = None if camera_name == "main": camera_obj = cam else: with overlay_camera_lock: if camera_name in overlay_camera_states: camera_obj = overlay_camera_states[camera_name].get("camera") if camera_obj is None: return None # Create a temporary renderer for depth capture with mujoco_lock: try: if camera_name == "main": render_height = env.height render_width = env.width else: render_height = OVERLAY_RENDER_HEIGHT render_width = OVERLAY_RENDER_WIDTH depth_renderer = mujoco.Renderer( env.model, height=render_height, width=render_width ) # Enable depth rendering depth_renderer.enable_depth_rendering() # Update scene and render depth depth_renderer.update_scene(env.data, camera=camera_obj) depth_array = depth_renderer.render() # Returns (height, width) float32 # Close the temporary renderer depth_renderer.close() # Convert depth to 16-bit PNG in millimeters (metric depth). # MuJoCo renderer returns depth in meters; preserve metric scale. max_depth_m = 10.0 try: vis_map = getattr(env.model, "vis", None) vis_map = getattr(vis_map, "map", None) if vis_map is not None else None zfar = getattr(vis_map, "zfar", None) if vis_map is not None else None if zfar is not None: max_depth_m = float(zfar) except Exception: pass # uint16 in mm can represent up to 65.535 m max_depth_m = min(max_depth_m, 65.535) depth_m = np.nan_to_num(depth_array, nan=0.0, posinf=max_depth_m, neginf=0.0) depth_m = np.clip(depth_m, 0.0, max_depth_m) depth_16bit = (depth_m * 1000.0).round().astype(np.uint16) # Encode as PNG ret, buffer = cv2.imencode('.png', depth_16bit) if not ret: return None return buffer.tobytes() except Exception as e: print(f"Error capturing depth snapshot: {e}") traceback.print_exc() return None @app.route(f'{API_PREFIX}/camera//depth_snapshot') def camera_depth_snapshot(name): """Endpoint to capture a single depth image as PNG.""" depth_png = capture_depth_snapshot(name) if depth_png is None: return jsonify({"error": "Failed to capture depth image"}), 500 response = make_response(depth_png) response.headers['Content-Type'] = 'image/png' response.headers['Content-Disposition'] = f'inline; filename="depth_{name}.png"' return response @app.route(f'{API_PREFIX}/metadata') def metadata(): robots_meta = {} for robot, scenes in ROBOT_SCENES.items(): robots_meta[robot] = { "label": ROBOT_LABELS.get(robot, robot), "scenes": scenes, } current_selection = { "robot": current_robot, "scene": current_scene, } with mujoco_lock: if env is not None and hasattr(env, "home_pose"): try: home_pose = env.home_pose if home_pose is not None: pose_list = ( home_pose.tolist() if hasattr(home_pose, "tolist") else list(home_pose) ) current_selection["home_pose"] = [float(val) for val in pose_list] except Exception: pass response = { "robots": robots_meta, "actions": AVAILABLE_ACTIONS, "nova_api": { "preconfigured": _nova_credentials_present(), }, } return jsonify(response) @app.route(f'{API_PREFIX}/nova_config', methods=['POST']) def update_nova_config(): payload = request.get_json(silent=True) or {} field_map = { "instance_url": "NOVA_INSTANCE_URL", "access_token": "NOVA_ACCESS_TOKEN", "cell_id": "NOVA_CELL_ID", "controller_id": "NOVA_CONTROLLER_ID", "motion_group_id": "NOVA_MOTION_GROUP_ID", "motion_group_model": "NOVA_MOTION_GROUP_MODEL", "tcp_name": "NOVA_TCP_NAME", "response_rate_ms": "NOVA_RESPONSE_RATE_MS", } updated = {} for key, env_key in field_map.items(): if key not in payload: continue value = payload.get(key) if value is None or (isinstance(value, str) and value.strip() == ""): os.environ.pop(env_key, None) updated[env_key] = None else: os.environ[env_key] = str(value).strip() updated[env_key] = os.environ[env_key] if not _nova_credentials_present(): return jsonify({ "ok": False, "error": "Missing required Nova credentials", "updated": updated, "preconfigured": _nova_credentials_present(), }), 400 use_state = _parse_bool(os.environ.get("NOVA_UR5_USE_STATE_STREAM") or os.environ.get("NOVA_USE_STATE_STREAM") or "true") use_ik = _parse_bool(os.environ.get("NOVA_UR5_USE_IK") or os.environ.get("NOVA_USE_IK") or "false") use_jogging = _parse_bool(os.environ.get("NOVA_UR5_USE_JOGGING") or os.environ.get("NOVA_USE_JOGGING") or "true") nova_api_config = { "instance_url": os.getenv("NOVA_INSTANCE_URL") or os.getenv("NOVA_API"), "access_token": os.getenv("NOVA_ACCESS_TOKEN"), "cell_id": os.getenv("NOVA_CELL_ID", "cell"), "controller_id": os.getenv("NOVA_CONTROLLER_ID"), "motion_group_id": os.getenv("NOVA_MOTION_GROUP_ID"), "motion_group_model": os.getenv("NOVA_MOTION_GROUP_MODEL"), "tcp_name": os.getenv("NOVA_TCP_NAME"), "response_rate_ms": int(os.getenv("NOVA_RESPONSE_RATE_MS", "200")), "use_state_stream": use_state, "use_ik": use_ik, "use_jogging": use_jogging, } reconfigured = False with mujoco_lock: if env is not None and current_robot in ("ur5", "ur5_t_push") and hasattr(env, "set_nova_enabled"): try: if hasattr(env, "_nova_api_config"): env._nova_api_config = dict(nova_api_config) reconfigured = bool(env.set_nova_enabled(True)) except Exception: traceback.print_exc() return jsonify({ "ok": False, "error": "Failed to reconfigure Nova integration", "updated": updated, "preconfigured": _nova_credentials_present(), }), 500 return jsonify({ "ok": True, "updated": updated, "preconfigured": _nova_credentials_present(), "reconfigured": reconfigured, }) def _env_request(func): @wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception: traceback.print_exc() raise return wrapper @app.route(f'{API_PREFIX}/env') @_env_request def get_env_info(): """Get static information about the current environment.""" with mujoco_lock: if env is None: return jsonify({"error": "no environment initialized"}), 503 # Get action and observation spaces action_space = _serialize_space(env.action_space) observation_space = _serialize_space(env.observation_space) # Get static environment properties has_gripper = getattr(env, 'has_gripper', False) scene_name = getattr(env, 'scene_name', None) control_mode = getattr(env, 'get_control_mode', lambda: None)() # Build camera feed URLs with pose + intrinsics camera_feeds = [] main_fovy = _camera_fovy_degrees(cam, env) main_intrinsics = _camera_intrinsics(env.width, env.height, main_fovy) camera_feeds.append({ "name": "main", "label": "Main Camera", "url": f"{API_PREFIX}/video_feed", "pose": _camera_pose(cam), "intrinsics": main_intrinsics, }) # Check for overlay cameras based on scene configs = (scene_name and OVERLAY_CAMERA_PRESETS.get(scene_name)) or OVERLAY_CAMERA_PRESETS.get(current_robot) or [] dynamic_configs = _get_dynamic_camera_configs(current_robot, scene_name) if dynamic_configs: merged_configs: dict[str, dict[str, Any]] = {} for feed in configs: merged_configs[feed.get("name", "aux")] = feed for feed in dynamic_configs: merged_configs[feed.get("name", "aux")] = feed configs = list(merged_configs.values()) for feed in configs: feed_name = feed.get('name', 'aux') with overlay_camera_lock: cam_state = overlay_camera_states.get(feed_name, {}).copy() cam_obj = cam_state.get("camera") feed_intrinsics = _camera_intrinsics( OVERLAY_RENDER_WIDTH, OVERLAY_RENDER_HEIGHT, _camera_fovy_degrees(cam_obj or cam, env), ) feed_payload = { "name": feed_name, "label": feed.get('label', feed_name), "url": f"{API_PREFIX}/camera/{feed_name}/video_feed" } if cam_obj is not None: feed_payload["pose"] = _camera_pose(cam_obj) feed_payload["intrinsics"] = feed_intrinsics camera_feeds.append({ **feed_payload }) # Get home pose for current robot home_pose_list = None if hasattr(env, "home_pose"): try: home_pose = env.home_pose if home_pose is not None: pose_list = ( home_pose.tolist() if hasattr(home_pose, "tolist") else list(home_pose) ) home_pose_list = [float(val) for val in pose_list] except Exception: pass response = { "robot": current_robot, "scene": scene_name or current_scene, "has_gripper": has_gripper, "control_mode": control_mode, "action_space": action_space, "observation_space": observation_space, "camera_feeds": camera_feeds } # Add home_pose if available if home_pose_list is not None: response["home_pose"] = home_pose_list return jsonify(response) @app.route(f'{API_PREFIX}/homing', methods=['GET']) @_env_request def home_blocking(): timeout_s = float(request.args.get("timeout_s", 30.0)) tolerance = float(request.args.get("tolerance", 0.01)) poll_interval = float(request.args.get("poll_interval_s", 0.1)) if timeout_s <= 0: return jsonify({"error": "timeout_s must be > 0"}), 400 if tolerance <= 0: return jsonify({"error": "tolerance must be > 0"}), 400 if poll_interval <= 0: return jsonify({"error": "poll_interval_s must be > 0"}), 400 with mujoco_lock: if env is None: return jsonify({"error": "no environment initialized"}), 503 if current_robot not in ("ur5", "ur5_t_push"): with mujoco_lock: home_pose = getattr(env, "home_pose", None) set_joints = getattr(env, "set_joint_positions", None) if home_pose is None or not callable(set_joints): return jsonify({"error": "homing not supported for current robot"}), 400 positions = home_pose.tolist() if hasattr(home_pose, "tolist") else list(home_pose) set_joints(positions) return jsonify({"status": "completed", "robot": current_robot, "homed": True}) with homing_lock: homing_active = homing_state.get("active") if homing_active: active_start_time = time.time() while True: with homing_lock: if not homing_state.get("active"): break if time.time() - active_start_time >= timeout_s: return jsonify({"status": "timeout", "robot": current_robot, "homed": False}), 200 time.sleep(min(poll_interval, 0.5)) with mujoco_lock, homing_lock: home_pose = getattr(env, "home_pose", None) get_joints = getattr(env, "get_joint_positions", None) start_multi_joint_jog = getattr(env, "start_multi_joint_jog", None) stop_jog = getattr(env, "stop_jog", None) if home_pose is None or not callable(get_joints) or not callable(start_multi_joint_jog) or not callable(stop_jog): return jsonify({"error": "Nova homing not supported for current robot"}), 400 homing_state["active"] = True previous_pause = _set_jogging_paused(True) start_time = time.time() status = "running" result_snapshot = {} try: with mujoco_lock: stop_jog = getattr(env, "stop_jog", None) if callable(stop_jog): stop_jog() while True: with mujoco_lock, homing_lock: result_snapshot = _step_multi_axis_homing_locked(tolerance=tolerance) if result_snapshot.get("status") == "done": status = "completed" break if time.time() - start_time >= timeout_s: status = "timeout" break time.sleep(poll_interval) finally: with mujoco_lock, homing_lock: stop_jog = getattr(env, "stop_jog", None) if callable(stop_jog) and homing_state.get("is_jogging", False): stop_jog() homing_state["active"] = False homing_state["is_jogging"] = False homing_state.pop("previous_errors", None) homing_state.pop("current_velocities", None) _set_jogging_paused(previous_pause) with mujoco_lock: home_pose = getattr(env, "home_pose", None) get_joints = getattr(env, "get_joint_positions", None) final_positions = get_joints() if callable(get_joints) else [] home_pose_list = ( home_pose.tolist() if home_pose is not None and hasattr(home_pose, "tolist") else (list(home_pose) if home_pose is not None else None) ) final_errors = None max_abs_error = None if home_pose_list is not None and final_positions is not None and len(final_positions) > 0: final_errors = [float(t - c) for t, c in zip(home_pose_list, final_positions)] max_abs_error = max(abs(e) for e in final_errors) final_positions_list = [float(v) for v in final_positions] if final_positions is not None and len(final_positions) > 0 else None response = { "status": status, "robot": current_robot, "homed": status == "completed", "duration_s": time.time() - start_time, "home_pose": home_pose_list, "final_positions": final_positions_list, "final_errors": final_errors, "max_abs_error": max_abs_error, "jogging_paused": _is_jogging_paused(), } return jsonify(response) if __name__ == '__main__': port = int(os.environ.get("PORT", "3004")) app.run(host='0.0.0.0', port=port, debug=False, threaded=True)