File size: 2,475 Bytes
0551bb3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59625d9
0551bb3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e9e2294
 
 
 
 
 
 
 
 
 
 
 
 
0551bb3
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""Lightweight WebSocket test client for nova-sim tests (no external dependencies)."""

import json
import time
import requests
from websockets.sync.client import connect


class NovaSimTestClient:
    """Simple WebSocket client for testing nova-sim."""

    def __init__(self, base_url: str = "http://localhost:3004/nova-sim/api/v1"):
        self.base_url = base_url
        self.ws = None
        self.latest_state = {}

    def connect(self):
        """Connect to nova-sim via WebSocket."""
        # Build WebSocket URL
        ws_url = self.base_url.replace("http://", "ws://").replace("https://", "wss://") + "/ws"

        # Connect WebSocket
        self.ws = connect(ws_url, open_timeout=10)

        # Send client identity
        self.ws.send(json.dumps({
            "type": "client_identity",
            "data": {"client_id": "test_client"}
        }))

        # Wait for first state
        time.sleep(0.5)
        self._receive_state()

    def close(self):
        """Close WebSocket connection."""
        if self.ws:
            self.ws.close()
            self.ws = None

    def _receive_state(self):
        """Receive and parse state message."""
        try:
            msg = self.ws.recv(timeout=0.1)
            if msg:
                data = json.loads(msg)
                if data.get("type") == "state":
                    self.latest_state = data.get("data", {})
        except TimeoutError:
            pass

    def send_message(self, msg: dict):
        """Send a message and update latest state."""
        self.ws.send(json.dumps(msg))
        self._receive_state()

    def home_blocking(self, timeout_s: float = 30.0, tolerance: float = 0.01, poll_interval_s: float = 0.1):
        """Invoke the blocking homing endpoint."""
        resp = requests.get(
            f"{self.base_url}/homing",
            params={
                "timeout_s": timeout_s,
                "tolerance": tolerance,
                "poll_interval_s": poll_interval_s,
            },
            timeout=max(5.0, timeout_s + 5.0),
        )
        resp.raise_for_status()
        return resp.json()

    def get_joint_positions(self):
        """Get current joint positions from latest state."""
        obs = self.latest_state.get("observation", {})
        return obs.get("joint_positions", [0, 0, 0, 0, 0, 0])

    def get_scene_objects(self):
        """Get scene objects from latest state."""
        return self.latest_state.get("scene_objects", [])