import importlib.util import json import math import os import threading import time import urllib.error import urllib.request from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Optional, Tuple try: from websockets.sync.client import connect except Exception as e: # pragma: no cover - optional dependency print(f"[Nova] WARNING: Failed to import websockets: {type(e).__name__}: {e}") print(f"[Nova] Install websockets with: pip install websockets") connect = None def _load_local_env_module(): try: from robots.ur5 import env_loader except ImportError: env_path = Path(__file__).resolve().parent / "env_loader.py" spec = importlib.util.spec_from_file_location("robots.ur5.env_loader", env_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) env_loader = module env_loader.load_local_env() _load_local_env_module() @dataclass class NovaApiConfig: instance_url: str access_token: str cell_id: str controller_id: str motion_group_id: str motion_group_model: Optional[str] = None tcp_name: Optional[str] = None response_rate_ms: int = 200 @classmethod def from_env(cls, override: Optional[Dict[str, Any]] = None) -> "NovaApiConfig": """Create NovaApiConfig from environment variables. Environment variables: NOVA_INSTANCE_URL: Nova instance URL (e.g., https://nova.wandelbots.io) NOVA_ACCESS_TOKEN: Nova API access token NOVA_CELL_ID: Cell ID NOVA_CONTROLLER_ID: Controller ID NOVA_MOTION_GROUP_ID: Motion group ID NOVA_MOTION_GROUP_MODEL: Motion group model (optional) NOVA_TCP_NAME: TCP name (optional, default: Flange) NOVA_RESPONSE_RATE_MS: Response rate in milliseconds (optional, default: 200) Args: override: Optional dict to override specific values from env vars Returns: NovaApiConfig instance Raises: ValueError: If required environment variables are not set """ override = override or {} # Required fields # Support both NOVA_INSTANCE_URL and NOVA_API for backward compatibility instance_url = override.get("instance_url") or os.getenv("NOVA_INSTANCE_URL") or os.getenv("NOVA_API") access_token = override.get("access_token") or os.getenv("NOVA_ACCESS_TOKEN") cell_id = override.get("cell_id") or os.getenv("NOVA_CELL_ID", "cell") controller_id = override.get("controller_id") or os.getenv("NOVA_CONTROLLER_ID") motion_group_id = override.get("motion_group_id") or os.getenv("NOVA_MOTION_GROUP_ID") # Validate required fields missing = [] if not instance_url: missing.append("NOVA_INSTANCE_URL or NOVA_API") if not access_token: missing.append("NOVA_ACCESS_TOKEN") if not controller_id: missing.append("NOVA_CONTROLLER_ID") if not motion_group_id: missing.append("NOVA_MOTION_GROUP_ID") if missing: raise ValueError( f"Missing required environment variables: {', '.join(missing)}. " "Please set these environment variables or pass them in the override dict." ) # Optional fields motion_group_model = override.get("motion_group_model") or os.getenv("NOVA_MOTION_GROUP_MODEL") tcp_name = override.get("tcp_name") or os.getenv("NOVA_TCP_NAME", "Flange") response_rate_ms = override.get("response_rate_ms") or int(os.getenv("NOVA_RESPONSE_RATE_MS", "200")) return cls( instance_url=instance_url, access_token=access_token, cell_id=cell_id, controller_id=controller_id, motion_group_id=motion_group_id, motion_group_model=motion_group_model, tcp_name=tcp_name, response_rate_ms=response_rate_ms, ) class NovaApiClient: def __init__(self, config: NovaApiConfig): self.config = config self._state_lock = threading.Lock() self._latest_state: Optional[Dict[str, Any]] = None self._stop_event = threading.Event() self._state_thread: Optional[threading.Thread] = None self._state_ws = None self._motion_group_description: Optional[Dict[str, Any]] = None self._last_state_log_time = 0.0 def start_state_stream(self) -> None: if connect is None: raise RuntimeError("websockets is required for Nova state streaming") if self._state_thread is not None: return print("[Nova] Starting state stream thread...") self._state_thread = threading.Thread(target=self._state_stream_loop, daemon=True) self._state_thread.start() def stop(self) -> None: self._stop_event.set() if self._state_ws is not None: try: self._state_ws.close() except Exception: pass if self._state_thread is not None: self._state_thread.join(timeout=2.0) self._state_thread = None def get_latest_state(self) -> Optional[Dict[str, Any]]: with self._state_lock: if self._latest_state is None: return None return dict(self._latest_state) def is_state_stream_connected(self) -> bool: """Check if state stream is connected and receiving data.""" return self._state_ws is not None and self._latest_state is not None def inverse_kinematics(self, position_m, orientation_quat) -> Optional[list]: model = self._ensure_motion_group_model() if not model: return None pose = { "position": [float(position_m[0] * 1000.0), float(position_m[1] * 1000.0), float(position_m[2] * 1000.0)], "orientation": _quat_to_rotvec(orientation_quat), } payload: Dict[str, Any] = { "motion_group_model": model, "tcp_poses": [pose], } description = self._ensure_motion_group_description() if description: if description.get("mounting"): payload["mounting"] = description["mounting"] if self.config.tcp_name and description.get("tcps", {}).get(self.config.tcp_name): payload["tcp_offset"] = description["tcps"][self.config.tcp_name]["pose"] response = self._request_json( "POST", f"/api/v2/cells/{self.config.cell_id}/kinematic/inverse", payload, ) joints = response.get("joints", []) if not joints: return None first_entry = joints[0] if isinstance(first_entry, list) and first_entry and isinstance(first_entry[0], list): return first_entry[0] if isinstance(first_entry, list): return first_entry return None def _state_stream_loop(self) -> None: backoff = 1.0 while not self._stop_event.is_set(): try: ws = self._connect_state_stream() self._state_ws = ws backoff = 1.0 for message in ws: if self._stop_event.is_set(): break try: data = json.loads(message) except json.JSONDecodeError: continue with self._state_lock: self._latest_state = data now = time.time() if now - self._last_state_log_time >= 10.0: self._last_state_log_time = now result = data.get("result", data) seq = result.get("sequence_number", "N/A") joints = result.get("joint_position") joint_info = f"{len(joints)} joints" if isinstance(joints, list) else "no joints" payload = { "seq": seq, "joint_position": joints, "timestamp": result.get("timestamp") } payload_str = json.dumps(payload, default=str) print(f"[Nova] State stream update (seq={seq}, {joint_info}) payload={payload_str}") ws.close() except Exception as exc: print(f"Nova state stream error: {exc}") finally: self._state_ws = None if self._stop_event.is_set(): break time.sleep(backoff) backoff = min(backoff * 2.0, 10.0) def _connect_state_stream(self): # Convert HTTP(S) URL to WebSocket URL ws_url = self.config.instance_url.replace("https://", "wss://").replace("http://", "ws://") url = ( f"{ws_url}/api/v2/cells/{self.config.cell_id}/controllers/" f"{self.config.controller_id}/motion-groups/{self.config.motion_group_id}/state-stream" f"?response_rate={int(self.config.response_rate_ms)}" ) print(f"[Nova] Connecting to state stream: {url}") headers = [("Authorization", f"Bearer {self.config.access_token}")] try: ws = connect(url, open_timeout=10, additional_headers=headers) print(f"[Nova] State stream connected successfully") return ws except TypeError: print(f"[Nova] Retrying connection with extra_headers parameter...") ws = connect(url, open_timeout=10, extra_headers=headers) print(f"[Nova] State stream connected successfully") return ws def _ensure_motion_group_description(self) -> Optional[Dict[str, Any]]: if self._motion_group_description is not None: return self._motion_group_description description = self._request_json( "GET", f"/api/v2/cells/{self.config.cell_id}/controllers/{self.config.controller_id}/" f"motion-groups/{self.config.motion_group_id}/description", None, ) self._motion_group_description = description return description def _ensure_motion_group_model(self) -> Optional[str]: if self.config.motion_group_model: return self.config.motion_group_model description = self._ensure_motion_group_description() if description and description.get("motion_group_model"): self.config.motion_group_model = description["motion_group_model"] return self.config.motion_group_model def _request_json(self, method: str, path: str, payload: Optional[Dict[str, Any]]): url = f"{self.config.instance_url}{path}" data = None if payload is not None: data = json.dumps(payload).encode("utf-8") request = urllib.request.Request(url, data=data, method=method) request.add_header("Authorization", f"Bearer {self.config.access_token}") request.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(request, timeout=10) as response: body = response.read().decode("utf-8") return json.loads(body) if body else {} except urllib.error.HTTPError as exc: body = exc.read().decode("utf-8") if exc.fp else "" raise RuntimeError(f"Nova API error {exc.code}: {body}") from exc def _quat_to_rotvec(quat) -> list: w, x, y, z = [float(v) for v in quat] w = max(min(w, 1.0), -1.0) angle = 2.0 * math.acos(w) s = math.sqrt(max(0.0, 1.0 - w * w)) if s < 1e-8: return [0.0, 0.0, 0.0] axis = [x / s, y / s, z / s] return [axis[0] * angle, axis[1] * angle, axis[2] * angle] def rotvec_to_quat(rotvec) -> Tuple[float, float, float, float]: rx, ry, rz = [float(v) for v in rotvec] angle = math.sqrt(rx * rx + ry * ry + rz * rz) if angle < 1e-8: return 1.0, 0.0, 0.0, 0.0 axis = (rx / angle, ry / angle, rz / angle) half = angle * 0.5 sin_half = math.sin(half) return ( math.cos(half), axis[0] * sin_half, axis[1] * sin_half, axis[2] * sin_half, )