Spaces:
Running
Running
| """Tests for Robot Control API (TDD). | |
| These tests verify the camera streaming and head control endpoints. | |
| Written first following TDD methodology. | |
| """ | |
| import pytest | |
| from unittest.mock import MagicMock, AsyncMock, patch, PropertyMock | |
| import base64 | |
| import numpy as np | |
| from fastapi.testclient import TestClient | |
| from fastapi import FastAPI | |
| class TestMoveHeadEndpoint: | |
| """Tests for POST /api/robot/move-head endpoint.""" | |
| def mock_tool_deps(self): | |
| """Create mock tool dependencies with proper return values.""" | |
| deps = MagicMock() | |
| # Mock reachy_mini methods | |
| deps.reachy_mini.get_current_head_pose.return_value = MagicMock() | |
| deps.reachy_mini.get_current_joint_positions.return_value = ( | |
| MagicMock(), # arm positions | |
| (0.0, 0.0, 0.0), # antennas + body_yaw | |
| ) | |
| # Mock movement_manager | |
| deps.movement_manager.queue_move = MagicMock() | |
| deps.movement_manager.set_moving_state = MagicMock() | |
| return deps | |
| async def test_move_head_left(self, mock_tool_deps): | |
| """Moving head left should call move_head tool with direction='left'.""" | |
| from reachy_mini_conversation_app.robot_control_api import move_head_endpoint | |
| result = await move_head_endpoint("left", mock_tool_deps) | |
| assert result["status"] == "looking left" | |
| async def test_move_head_right(self, mock_tool_deps): | |
| """Moving head right should return success status.""" | |
| from reachy_mini_conversation_app.robot_control_api import move_head_endpoint | |
| result = await move_head_endpoint("right", mock_tool_deps) | |
| assert "looking right" in result["status"] | |
| async def test_move_head_up(self, mock_tool_deps): | |
| """Moving head up should return success status.""" | |
| from reachy_mini_conversation_app.robot_control_api import move_head_endpoint | |
| result = await move_head_endpoint("up", mock_tool_deps) | |
| assert "looking up" in result["status"] | |
| async def test_move_head_down(self, mock_tool_deps): | |
| """Moving head down should return success status.""" | |
| from reachy_mini_conversation_app.robot_control_api import move_head_endpoint | |
| result = await move_head_endpoint("down", mock_tool_deps) | |
| assert "looking down" in result["status"] | |
| async def test_move_head_front(self, mock_tool_deps): | |
| """Moving head to front (center) should return success status.""" | |
| from reachy_mini_conversation_app.robot_control_api import move_head_endpoint | |
| result = await move_head_endpoint("front", mock_tool_deps) | |
| assert "looking front" in result["status"] | |
| async def test_move_head_invalid_direction(self, mock_tool_deps): | |
| """Invalid direction should return error.""" | |
| from reachy_mini_conversation_app.robot_control_api import move_head_endpoint | |
| result = await move_head_endpoint("backwards", mock_tool_deps) | |
| assert "error" in result | |
| class TestCameraStreamingStatus: | |
| """Tests for camera streaming status endpoint.""" | |
| async def test_camera_status_returns_streaming_state(self): | |
| """Camera status should report whether streaming is active.""" | |
| from reachy_mini_conversation_app.robot_control_api import get_camera_status | |
| result = await get_camera_status() | |
| assert "streaming" in result | |
| assert isinstance(result["streaming"], bool) | |
| async def test_camera_status_returns_fps(self): | |
| """Camera status should report target FPS.""" | |
| from reachy_mini_conversation_app.robot_control_api import get_camera_status | |
| result = await get_camera_status() | |
| assert "target_fps" in result | |
| assert result["target_fps"] == 8 # Our design decision | |
| class TestCameraFrameEncoding: | |
| """Tests for camera frame encoding utilities.""" | |
| def test_encode_frame_to_base64(self): | |
| """Frame encoding should produce valid base64 JPEG.""" | |
| from reachy_mini_conversation_app.robot_control_api import ( | |
| encode_frame_to_base64, | |
| ) | |
| # Create a simple test frame (100x100 RGB) | |
| test_frame = np.zeros((100, 100, 3), dtype=np.uint8) | |
| test_frame[:, :] = [255, 0, 0] # Red frame | |
| result = encode_frame_to_base64(test_frame, quality=70) | |
| # Should be valid base64 | |
| assert isinstance(result, str) | |
| decoded = base64.b64decode(result) | |
| # JPEG magic bytes | |
| assert decoded[:2] == b"\xff\xd8" | |
| def test_encode_frame_returns_none_for_invalid_frame(self): | |
| """Encoding invalid frame should return None, not crash.""" | |
| from reachy_mini_conversation_app.robot_control_api import ( | |
| encode_frame_to_base64, | |
| ) | |
| result = encode_frame_to_base64(None, quality=70) | |
| assert result is None | |
| class TestCameraStreamingLoop: | |
| """Tests for the camera streaming background task.""" | |
| async def test_streaming_only_when_clients_connected(self): | |
| """Should not emit frames when no WebSocket clients are connected.""" | |
| from reachy_mini_conversation_app.robot_control_api import ( | |
| should_stream_frame, | |
| set_streaming_enabled, | |
| ) | |
| from reachy_mini_conversation_app.stream_api import ConnectionManager | |
| # Enable streaming | |
| set_streaming_enabled(True) | |
| # Mock client_count property to return 0 | |
| with patch.object( | |
| ConnectionManager, "client_count", new_callable=PropertyMock, return_value=0 | |
| ): | |
| assert should_stream_frame() is False | |
| async def test_streaming_when_clients_connected(self): | |
| """Should emit frames when WebSocket clients are connected.""" | |
| from reachy_mini_conversation_app.robot_control_api import ( | |
| should_stream_frame, | |
| set_streaming_enabled, | |
| ) | |
| from reachy_mini_conversation_app.stream_api import ConnectionManager | |
| # Enable streaming | |
| set_streaming_enabled(True) | |
| # Mock client_count property to return 1 | |
| with patch.object( | |
| ConnectionManager, "client_count", new_callable=PropertyMock, return_value=1 | |
| ): | |
| assert should_stream_frame() is True | |
| async def test_streaming_disabled_even_with_clients(self): | |
| """Should not stream when streaming is disabled, even with clients.""" | |
| from reachy_mini_conversation_app.robot_control_api import ( | |
| should_stream_frame, | |
| set_streaming_enabled, | |
| ) | |
| from reachy_mini_conversation_app.stream_api import ConnectionManager | |
| # Disable streaming | |
| set_streaming_enabled(False) | |
| # Mock client_count property to return 1 | |
| with patch.object( | |
| ConnectionManager, "client_count", new_callable=PropertyMock, return_value=1 | |
| ): | |
| assert should_stream_frame() is False | |