nova-sim / tests /test_dynamic_camera.py
Georg
Implement dynamic camera management and enhance overlay functionality in mujoco_server.py
2425670
"""Tests for dynamic camera endpoint and state stream announcements.
Run with server running:
python nova-sim/mujoco_server.py
Then in another terminal:
pytest nova-sim/tests/test_dynamic_camera.py -v
"""
import json
import time
import pytest
import requests
from websockets.sync.client import connect
BASE_URL = "http://localhost:3004/nova-sim/api/v1"
WS_URL = "ws://localhost:3004/nova-sim/api/v1/ws"
@pytest.fixture(scope="module")
def check_server():
"""Check if the server is running before tests."""
try:
response = requests.get(f"{BASE_URL}/metadata", timeout=2)
response.raise_for_status()
except requests.RequestException:
pytest.skip("Nova-Sim server is not running at localhost:3004")
def test_add_dynamic_camera_announces_state(check_server):
"""Add a camera and verify state stream announcement + env listing."""
camera_name = "aux_side"
payload = {
"name": camera_name,
"label": "Aux Side View",
"lookat": [0.55, -0.1, 0.42],
"distance": 0.9,
"azimuth": -45,
"elevation": -30,
"replace": True,
}
with connect(WS_URL, timeout=10) as ws:
ws.send(json.dumps({
"type": "client_identity",
"data": {"client_id": "test-dynamic-camera"}
}))
resp = requests.post(f"{BASE_URL}/cameras", json=payload, timeout=5)
assert resp.status_code == 200
data = resp.json()
assert data["camera"]["name"] == camera_name
found_event = False
deadline = time.time() + 5
while time.time() < deadline:
try:
msg = ws.recv(timeout=1)
except TimeoutError:
continue
parsed = json.loads(msg)
if parsed.get("type") != "state":
continue
event = (parsed.get("data") or {}).get("camera_event")
if not event:
continue
if event.get("action") == "added" and (event.get("camera") or {}).get("name") == camera_name:
found_event = True
break
assert found_event, "Did not receive camera_event state message"
env_resp = requests.get(f"{BASE_URL}/env", timeout=5)
env_resp.raise_for_status()
env_data = env_resp.json()
feed_names = [feed.get("name") for feed in env_data.get("camera_feeds", [])]
assert camera_name in feed_names