| import importlib.util |
| import json |
| import math |
| import os |
| import threading |
| import time |
| import urllib.error |
| import urllib.request |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Dict, Optional, Tuple |
|
|
| try: |
| from websockets.sync.client import connect |
| except Exception as e: |
| print(f"[Nova] WARNING: Failed to import websockets: {type(e).__name__}: {e}") |
| print(f"[Nova] Install websockets with: pip install websockets") |
| connect = None |
|
|
|
|
| def _load_local_env_module(): |
| try: |
| from robots.ur5 import env_loader |
| except ImportError: |
| env_path = Path(__file__).resolve().parent / "env_loader.py" |
| spec = importlib.util.spec_from_file_location("robots.ur5.env_loader", env_path) |
| module = importlib.util.module_from_spec(spec) |
| spec.loader.exec_module(module) |
| env_loader = module |
| env_loader.load_local_env() |
|
|
|
|
| _load_local_env_module() |
|
|
|
|
| @dataclass |
| class NovaApiConfig: |
| instance_url: str |
| access_token: str |
| cell_id: str |
| controller_id: str |
| motion_group_id: str |
| motion_group_model: Optional[str] = None |
| tcp_name: Optional[str] = None |
| response_rate_ms: int = 200 |
|
|
| @classmethod |
| def from_env(cls, override: Optional[Dict[str, Any]] = None) -> "NovaApiConfig": |
| """Create NovaApiConfig from environment variables. |
| |
| Environment variables: |
| NOVA_INSTANCE_URL: Nova instance URL (e.g., https://nova.wandelbots.io) |
| NOVA_ACCESS_TOKEN: Nova API access token |
| NOVA_CELL_ID: Cell ID |
| NOVA_CONTROLLER_ID: Controller ID |
| NOVA_MOTION_GROUP_ID: Motion group ID |
| NOVA_MOTION_GROUP_MODEL: Motion group model (optional) |
| NOVA_TCP_NAME: TCP name (optional, default: Flange) |
| NOVA_RESPONSE_RATE_MS: Response rate in milliseconds (optional, default: 200) |
| |
| Args: |
| override: Optional dict to override specific values from env vars |
| |
| Returns: |
| NovaApiConfig instance |
| |
| Raises: |
| ValueError: If required environment variables are not set |
| """ |
| override = override or {} |
|
|
| |
| |
| instance_url = override.get("instance_url") or os.getenv("NOVA_INSTANCE_URL") or os.getenv("NOVA_API") |
| access_token = override.get("access_token") or os.getenv("NOVA_ACCESS_TOKEN") |
| cell_id = override.get("cell_id") or os.getenv("NOVA_CELL_ID", "cell") |
| controller_id = override.get("controller_id") or os.getenv("NOVA_CONTROLLER_ID") |
| motion_group_id = override.get("motion_group_id") or os.getenv("NOVA_MOTION_GROUP_ID") |
|
|
| |
| missing = [] |
| if not instance_url: |
| missing.append("NOVA_INSTANCE_URL or NOVA_API") |
| if not access_token: |
| missing.append("NOVA_ACCESS_TOKEN") |
| if not controller_id: |
| missing.append("NOVA_CONTROLLER_ID") |
| if not motion_group_id: |
| missing.append("NOVA_MOTION_GROUP_ID") |
|
|
| if missing: |
| raise ValueError( |
| f"Missing required environment variables: {', '.join(missing)}. " |
| "Please set these environment variables or pass them in the override dict." |
| ) |
|
|
| |
| motion_group_model = override.get("motion_group_model") or os.getenv("NOVA_MOTION_GROUP_MODEL") |
| tcp_name = override.get("tcp_name") or os.getenv("NOVA_TCP_NAME", "Flange") |
| response_rate_ms = override.get("response_rate_ms") or int(os.getenv("NOVA_RESPONSE_RATE_MS", "200")) |
|
|
| return cls( |
| instance_url=instance_url, |
| access_token=access_token, |
| cell_id=cell_id, |
| controller_id=controller_id, |
| motion_group_id=motion_group_id, |
| motion_group_model=motion_group_model, |
| tcp_name=tcp_name, |
| response_rate_ms=response_rate_ms, |
| ) |
|
|
|
|
| class NovaApiClient: |
| def __init__(self, config: NovaApiConfig): |
| self.config = config |
| self._state_lock = threading.Lock() |
| self._latest_state: Optional[Dict[str, Any]] = None |
| self._stop_event = threading.Event() |
| self._state_thread: Optional[threading.Thread] = None |
| self._state_ws = None |
| self._motion_group_description: Optional[Dict[str, Any]] = None |
| self._last_state_log_time = 0.0 |
|
|
| def start_state_stream(self) -> None: |
| if connect is None: |
| raise RuntimeError("websockets is required for Nova state streaming") |
| if self._state_thread is not None: |
| return |
| print("[Nova] Starting state stream thread...") |
| self._state_thread = threading.Thread(target=self._state_stream_loop, daemon=True) |
| self._state_thread.start() |
|
|
| def stop(self) -> None: |
| self._stop_event.set() |
| if self._state_ws is not None: |
| try: |
| self._state_ws.close() |
| except Exception: |
| pass |
| if self._state_thread is not None: |
| self._state_thread.join(timeout=2.0) |
| self._state_thread = None |
|
|
| def get_latest_state(self) -> Optional[Dict[str, Any]]: |
| with self._state_lock: |
| if self._latest_state is None: |
| return None |
| return dict(self._latest_state) |
|
|
| def is_state_stream_connected(self) -> bool: |
| """Check if state stream is connected and receiving data.""" |
| return self._state_ws is not None and self._latest_state is not None |
|
|
| def inverse_kinematics(self, position_m, orientation_quat) -> Optional[list]: |
| model = self._ensure_motion_group_model() |
| if not model: |
| return None |
|
|
| pose = { |
| "position": [float(position_m[0] * 1000.0), float(position_m[1] * 1000.0), float(position_m[2] * 1000.0)], |
| "orientation": _quat_to_rotvec(orientation_quat), |
| } |
|
|
| payload: Dict[str, Any] = { |
| "motion_group_model": model, |
| "tcp_poses": [pose], |
| } |
| description = self._ensure_motion_group_description() |
| if description: |
| if description.get("mounting"): |
| payload["mounting"] = description["mounting"] |
| if self.config.tcp_name and description.get("tcps", {}).get(self.config.tcp_name): |
| payload["tcp_offset"] = description["tcps"][self.config.tcp_name]["pose"] |
|
|
| response = self._request_json( |
| "POST", |
| f"/api/v2/cells/{self.config.cell_id}/kinematic/inverse", |
| payload, |
| ) |
| joints = response.get("joints", []) |
| if not joints: |
| return None |
|
|
| first_entry = joints[0] |
| if isinstance(first_entry, list) and first_entry and isinstance(first_entry[0], list): |
| return first_entry[0] |
| if isinstance(first_entry, list): |
| return first_entry |
| return None |
|
|
| def _state_stream_loop(self) -> None: |
| backoff = 1.0 |
| while not self._stop_event.is_set(): |
| try: |
| ws = self._connect_state_stream() |
| self._state_ws = ws |
| backoff = 1.0 |
| for message in ws: |
| if self._stop_event.is_set(): |
| break |
| try: |
| data = json.loads(message) |
| except json.JSONDecodeError: |
| continue |
| with self._state_lock: |
| self._latest_state = data |
| now = time.time() |
| if now - self._last_state_log_time >= 10.0: |
| self._last_state_log_time = now |
| result = data.get("result", data) |
| seq = result.get("sequence_number", "N/A") |
| joints = result.get("joint_position") |
| joint_info = f"{len(joints)} joints" if isinstance(joints, list) else "no joints" |
| payload = { |
| "seq": seq, |
| "joint_position": joints, |
| "timestamp": result.get("timestamp") |
| } |
| payload_str = json.dumps(payload, default=str) |
| print(f"[Nova] State stream update (seq={seq}, {joint_info}) payload={payload_str}") |
| ws.close() |
| except Exception as exc: |
| print(f"Nova state stream error: {exc}") |
| finally: |
| self._state_ws = None |
| if self._stop_event.is_set(): |
| break |
| time.sleep(backoff) |
| backoff = min(backoff * 2.0, 10.0) |
|
|
| def _connect_state_stream(self): |
| |
| ws_url = self.config.instance_url.replace("https://", "wss://").replace("http://", "ws://") |
| url = ( |
| f"{ws_url}/api/v2/cells/{self.config.cell_id}/controllers/" |
| f"{self.config.controller_id}/motion-groups/{self.config.motion_group_id}/state-stream" |
| f"?response_rate={int(self.config.response_rate_ms)}" |
| ) |
| print(f"[Nova] Connecting to state stream: {url}") |
| headers = [("Authorization", f"Bearer {self.config.access_token}")] |
| try: |
| ws = connect(url, open_timeout=10, additional_headers=headers) |
| print(f"[Nova] State stream connected successfully") |
| return ws |
| except TypeError: |
| print(f"[Nova] Retrying connection with extra_headers parameter...") |
| ws = connect(url, open_timeout=10, extra_headers=headers) |
| print(f"[Nova] State stream connected successfully") |
| return ws |
|
|
| def _ensure_motion_group_description(self) -> Optional[Dict[str, Any]]: |
| if self._motion_group_description is not None: |
| return self._motion_group_description |
| description = self._request_json( |
| "GET", |
| f"/api/v2/cells/{self.config.cell_id}/controllers/{self.config.controller_id}/" |
| f"motion-groups/{self.config.motion_group_id}/description", |
| None, |
| ) |
| self._motion_group_description = description |
| return description |
|
|
| def _ensure_motion_group_model(self) -> Optional[str]: |
| if self.config.motion_group_model: |
| return self.config.motion_group_model |
| description = self._ensure_motion_group_description() |
| if description and description.get("motion_group_model"): |
| self.config.motion_group_model = description["motion_group_model"] |
| return self.config.motion_group_model |
|
|
| def _request_json(self, method: str, path: str, payload: Optional[Dict[str, Any]]): |
| url = f"{self.config.instance_url}{path}" |
| data = None |
| if payload is not None: |
| data = json.dumps(payload).encode("utf-8") |
| request = urllib.request.Request(url, data=data, method=method) |
| request.add_header("Authorization", f"Bearer {self.config.access_token}") |
| request.add_header("Content-Type", "application/json") |
| try: |
| with urllib.request.urlopen(request, timeout=10) as response: |
| body = response.read().decode("utf-8") |
| return json.loads(body) if body else {} |
| except urllib.error.HTTPError as exc: |
| body = exc.read().decode("utf-8") if exc.fp else "" |
| raise RuntimeError(f"Nova API error {exc.code}: {body}") from exc |
|
|
|
|
| def _quat_to_rotvec(quat) -> list: |
| w, x, y, z = [float(v) for v in quat] |
| w = max(min(w, 1.0), -1.0) |
| angle = 2.0 * math.acos(w) |
| s = math.sqrt(max(0.0, 1.0 - w * w)) |
| if s < 1e-8: |
| return [0.0, 0.0, 0.0] |
| axis = [x / s, y / s, z / s] |
| return [axis[0] * angle, axis[1] * angle, axis[2] * angle] |
|
|
|
|
| def rotvec_to_quat(rotvec) -> Tuple[float, float, float, float]: |
| rx, ry, rz = [float(v) for v in rotvec] |
| angle = math.sqrt(rx * rx + ry * ry + rz * rz) |
| if angle < 1e-8: |
| return 1.0, 0.0, 0.0, 0.0 |
| axis = (rx / angle, ry / angle, rz / angle) |
| half = angle * 0.5 |
| sin_half = math.sin(half) |
| return ( |
| math.cos(half), |
| axis[0] * sin_half, |
| axis[1] * sin_half, |
| axis[2] * sin_half, |
| ) |
|
|