Georg
Implement dynamic camera management and enhance overlay functionality in mujoco_server.py
2425670 | """Tests for dynamic camera endpoint and state stream announcements. | |
| Run with server running: | |
| python nova-sim/mujoco_server.py | |
| Then in another terminal: | |
| pytest nova-sim/tests/test_dynamic_camera.py -v | |
| """ | |
| import json | |
| import time | |
| import pytest | |
| import requests | |
| from websockets.sync.client import connect | |
| BASE_URL = "http://localhost:3004/nova-sim/api/v1" | |
| WS_URL = "ws://localhost:3004/nova-sim/api/v1/ws" | |
| def check_server(): | |
| """Check if the server is running before tests.""" | |
| try: | |
| response = requests.get(f"{BASE_URL}/metadata", timeout=2) | |
| response.raise_for_status() | |
| except requests.RequestException: | |
| pytest.skip("Nova-Sim server is not running at localhost:3004") | |
| def test_add_dynamic_camera_announces_state(check_server): | |
| """Add a camera and verify state stream announcement + env listing.""" | |
| camera_name = "aux_side" | |
| payload = { | |
| "name": camera_name, | |
| "label": "Aux Side View", | |
| "lookat": [0.55, -0.1, 0.42], | |
| "distance": 0.9, | |
| "azimuth": -45, | |
| "elevation": -30, | |
| "replace": True, | |
| } | |
| with connect(WS_URL, timeout=10) as ws: | |
| ws.send(json.dumps({ | |
| "type": "client_identity", | |
| "data": {"client_id": "test-dynamic-camera"} | |
| })) | |
| resp = requests.post(f"{BASE_URL}/cameras", json=payload, timeout=5) | |
| assert resp.status_code == 200 | |
| data = resp.json() | |
| assert data["camera"]["name"] == camera_name | |
| found_event = False | |
| deadline = time.time() + 5 | |
| while time.time() < deadline: | |
| try: | |
| msg = ws.recv(timeout=1) | |
| except TimeoutError: | |
| continue | |
| parsed = json.loads(msg) | |
| if parsed.get("type") != "state": | |
| continue | |
| event = (parsed.get("data") or {}).get("camera_event") | |
| if not event: | |
| continue | |
| if event.get("action") == "added" and (event.get("camera") or {}).get("name") == camera_name: | |
| found_event = True | |
| break | |
| assert found_event, "Did not receive camera_event state message" | |
| env_resp = requests.get(f"{BASE_URL}/env", timeout=5) | |
| env_resp.raise_for_status() | |
| env_data = env_resp.json() | |
| feed_names = [feed.get("name") for feed in env_data.get("camera_feeds", [])] | |
| assert camera_name in feed_names | |