File size: 7,159 Bytes
af9cde9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
"""Tests for Robot Control API (TDD).

These tests verify the camera streaming and head control endpoints.
Written first following TDD methodology.
"""

import pytest
from unittest.mock import MagicMock, AsyncMock, patch, PropertyMock
import base64
import numpy as np

from fastapi.testclient import TestClient
from fastapi import FastAPI


class TestMoveHeadEndpoint:
    """Tests for POST /api/robot/move-head endpoint."""

    @pytest.fixture
    def mock_tool_deps(self):
        """Create mock tool dependencies with proper return values."""
        deps = MagicMock()

        # Mock reachy_mini methods
        deps.reachy_mini.get_current_head_pose.return_value = MagicMock()
        deps.reachy_mini.get_current_joint_positions.return_value = (
            MagicMock(),  # arm positions
            (0.0, 0.0, 0.0),  # antennas + body_yaw
        )

        # Mock movement_manager
        deps.movement_manager.queue_move = MagicMock()
        deps.movement_manager.set_moving_state = MagicMock()

        return deps

    @pytest.mark.asyncio
    async def test_move_head_left(self, mock_tool_deps):
        """Moving head left should call move_head tool with direction='left'."""
        from reachy_mini_conversation_app.robot_control_api import move_head_endpoint

        result = await move_head_endpoint("left", mock_tool_deps)
        assert result["status"] == "looking left"

    @pytest.mark.asyncio
    async def test_move_head_right(self, mock_tool_deps):
        """Moving head right should return success status."""
        from reachy_mini_conversation_app.robot_control_api import move_head_endpoint

        result = await move_head_endpoint("right", mock_tool_deps)
        assert "looking right" in result["status"]

    @pytest.mark.asyncio
    async def test_move_head_up(self, mock_tool_deps):
        """Moving head up should return success status."""
        from reachy_mini_conversation_app.robot_control_api import move_head_endpoint

        result = await move_head_endpoint("up", mock_tool_deps)
        assert "looking up" in result["status"]

    @pytest.mark.asyncio
    async def test_move_head_down(self, mock_tool_deps):
        """Moving head down should return success status."""
        from reachy_mini_conversation_app.robot_control_api import move_head_endpoint

        result = await move_head_endpoint("down", mock_tool_deps)
        assert "looking down" in result["status"]

    @pytest.mark.asyncio
    async def test_move_head_front(self, mock_tool_deps):
        """Moving head to front (center) should return success status."""
        from reachy_mini_conversation_app.robot_control_api import move_head_endpoint

        result = await move_head_endpoint("front", mock_tool_deps)
        assert "looking front" in result["status"]

    @pytest.mark.asyncio
    async def test_move_head_invalid_direction(self, mock_tool_deps):
        """Invalid direction should return error."""
        from reachy_mini_conversation_app.robot_control_api import move_head_endpoint

        result = await move_head_endpoint("backwards", mock_tool_deps)
        assert "error" in result


class TestCameraStreamingStatus:
    """Tests for camera streaming status endpoint."""

    @pytest.mark.asyncio
    async def test_camera_status_returns_streaming_state(self):
        """Camera status should report whether streaming is active."""
        from reachy_mini_conversation_app.robot_control_api import get_camera_status

        result = await get_camera_status()
        assert "streaming" in result
        assert isinstance(result["streaming"], bool)

    @pytest.mark.asyncio
    async def test_camera_status_returns_fps(self):
        """Camera status should report target FPS."""
        from reachy_mini_conversation_app.robot_control_api import get_camera_status

        result = await get_camera_status()
        assert "target_fps" in result
        assert result["target_fps"] == 8  # Our design decision


class TestCameraFrameEncoding:
    """Tests for camera frame encoding utilities."""

    def test_encode_frame_to_base64(self):
        """Frame encoding should produce valid base64 JPEG."""
        from reachy_mini_conversation_app.robot_control_api import (
            encode_frame_to_base64,
        )

        # Create a simple test frame (100x100 RGB)
        test_frame = np.zeros((100, 100, 3), dtype=np.uint8)
        test_frame[:, :] = [255, 0, 0]  # Red frame

        result = encode_frame_to_base64(test_frame, quality=70)

        # Should be valid base64
        assert isinstance(result, str)
        decoded = base64.b64decode(result)
        # JPEG magic bytes
        assert decoded[:2] == b"\xff\xd8"

    def test_encode_frame_returns_none_for_invalid_frame(self):
        """Encoding invalid frame should return None, not crash."""
        from reachy_mini_conversation_app.robot_control_api import (
            encode_frame_to_base64,
        )

        result = encode_frame_to_base64(None, quality=70)
        assert result is None


class TestCameraStreamingLoop:
    """Tests for the camera streaming background task."""

    @pytest.mark.asyncio
    async def test_streaming_only_when_clients_connected(self):
        """Should not emit frames when no WebSocket clients are connected."""
        from reachy_mini_conversation_app.robot_control_api import (
            should_stream_frame,
            set_streaming_enabled,
        )
        from reachy_mini_conversation_app.stream_api import ConnectionManager

        # Enable streaming
        set_streaming_enabled(True)

        # Mock client_count property to return 0
        with patch.object(
            ConnectionManager, "client_count", new_callable=PropertyMock, return_value=0
        ):
            assert should_stream_frame() is False

    @pytest.mark.asyncio
    async def test_streaming_when_clients_connected(self):
        """Should emit frames when WebSocket clients are connected."""
        from reachy_mini_conversation_app.robot_control_api import (
            should_stream_frame,
            set_streaming_enabled,
        )
        from reachy_mini_conversation_app.stream_api import ConnectionManager

        # Enable streaming
        set_streaming_enabled(True)

        # Mock client_count property to return 1
        with patch.object(
            ConnectionManager, "client_count", new_callable=PropertyMock, return_value=1
        ):
            assert should_stream_frame() is True

    @pytest.mark.asyncio
    async def test_streaming_disabled_even_with_clients(self):
        """Should not stream when streaming is disabled, even with clients."""
        from reachy_mini_conversation_app.robot_control_api import (
            should_stream_frame,
            set_streaming_enabled,
        )
        from reachy_mini_conversation_app.stream_api import ConnectionManager

        # Disable streaming
        set_streaming_enabled(False)

        # Mock client_count property to return 1
        with patch.object(
            ConnectionManager, "client_count", new_callable=PropertyMock, return_value=1
        ):
            assert should_stream_frame() is False