File size: 2,435 Bytes
2425670
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""Tests for dynamic camera endpoint and state stream announcements.

Run with server running:
    python nova-sim/mujoco_server.py

Then in another terminal:
    pytest nova-sim/tests/test_dynamic_camera.py -v
"""

import json
import time
import pytest
import requests
from websockets.sync.client import connect


BASE_URL = "http://localhost:3004/nova-sim/api/v1"
WS_URL = "ws://localhost:3004/nova-sim/api/v1/ws"


@pytest.fixture(scope="module")
def check_server():
    """Check if the server is running before tests."""
    try:
        response = requests.get(f"{BASE_URL}/metadata", timeout=2)
        response.raise_for_status()
    except requests.RequestException:
        pytest.skip("Nova-Sim server is not running at localhost:3004")


def test_add_dynamic_camera_announces_state(check_server):
    """Add a camera and verify state stream announcement + env listing."""
    camera_name = "aux_side"
    payload = {
        "name": camera_name,
        "label": "Aux Side View",
        "lookat": [0.55, -0.1, 0.42],
        "distance": 0.9,
        "azimuth": -45,
        "elevation": -30,
        "replace": True,
    }

    with connect(WS_URL, timeout=10) as ws:
        ws.send(json.dumps({
            "type": "client_identity",
            "data": {"client_id": "test-dynamic-camera"}
        }))

        resp = requests.post(f"{BASE_URL}/cameras", json=payload, timeout=5)
        assert resp.status_code == 200
        data = resp.json()
        assert data["camera"]["name"] == camera_name

        found_event = False
        deadline = time.time() + 5
        while time.time() < deadline:
            try:
                msg = ws.recv(timeout=1)
            except TimeoutError:
                continue
            parsed = json.loads(msg)
            if parsed.get("type") != "state":
                continue
            event = (parsed.get("data") or {}).get("camera_event")
            if not event:
                continue
            if event.get("action") == "added" and (event.get("camera") or {}).get("name") == camera_name:
                found_event = True
                break

        assert found_event, "Did not receive camera_event state message"

    env_resp = requests.get(f"{BASE_URL}/env", timeout=5)
    env_resp.raise_for_status()
    env_data = env_resp.json()
    feed_names = [feed.get("name") for feed in env_data.get("camera_feeds", [])]
    assert camera_name in feed_names