"""Tests for Robot Control API (TDD). These tests verify the camera streaming and head control endpoints. Written first following TDD methodology. """ import pytest from unittest.mock import MagicMock, AsyncMock, patch, PropertyMock import base64 import numpy as np from fastapi.testclient import TestClient from fastapi import FastAPI class TestMoveHeadEndpoint: """Tests for POST /api/robot/move-head endpoint.""" @pytest.fixture def mock_tool_deps(self): """Create mock tool dependencies with proper return values.""" deps = MagicMock() # Mock reachy_mini methods deps.reachy_mini.get_current_head_pose.return_value = MagicMock() deps.reachy_mini.get_current_joint_positions.return_value = ( MagicMock(), # arm positions (0.0, 0.0, 0.0), # antennas + body_yaw ) # Mock movement_manager deps.movement_manager.queue_move = MagicMock() deps.movement_manager.set_moving_state = MagicMock() return deps @pytest.mark.asyncio async def test_move_head_left(self, mock_tool_deps): """Moving head left should call move_head tool with direction='left'.""" from reachy_mini_conversation_app.robot_control_api import move_head_endpoint result = await move_head_endpoint("left", mock_tool_deps) assert result["status"] == "looking left" @pytest.mark.asyncio async def test_move_head_right(self, mock_tool_deps): """Moving head right should return success status.""" from reachy_mini_conversation_app.robot_control_api import move_head_endpoint result = await move_head_endpoint("right", mock_tool_deps) assert "looking right" in result["status"] @pytest.mark.asyncio async def test_move_head_up(self, mock_tool_deps): """Moving head up should return success status.""" from reachy_mini_conversation_app.robot_control_api import move_head_endpoint result = await move_head_endpoint("up", mock_tool_deps) assert "looking up" in result["status"] @pytest.mark.asyncio async def test_move_head_down(self, mock_tool_deps): """Moving head down should return success status.""" from reachy_mini_conversation_app.robot_control_api import move_head_endpoint result = await move_head_endpoint("down", mock_tool_deps) assert "looking down" in result["status"] @pytest.mark.asyncio async def test_move_head_front(self, mock_tool_deps): """Moving head to front (center) should return success status.""" from reachy_mini_conversation_app.robot_control_api import move_head_endpoint result = await move_head_endpoint("front", mock_tool_deps) assert "looking front" in result["status"] @pytest.mark.asyncio async def test_move_head_invalid_direction(self, mock_tool_deps): """Invalid direction should return error.""" from reachy_mini_conversation_app.robot_control_api import move_head_endpoint result = await move_head_endpoint("backwards", mock_tool_deps) assert "error" in result class TestCameraStreamingStatus: """Tests for camera streaming status endpoint.""" @pytest.mark.asyncio async def test_camera_status_returns_streaming_state(self): """Camera status should report whether streaming is active.""" from reachy_mini_conversation_app.robot_control_api import get_camera_status result = await get_camera_status() assert "streaming" in result assert isinstance(result["streaming"], bool) @pytest.mark.asyncio async def test_camera_status_returns_fps(self): """Camera status should report target FPS.""" from reachy_mini_conversation_app.robot_control_api import get_camera_status result = await get_camera_status() assert "target_fps" in result assert result["target_fps"] == 8 # Our design decision class TestCameraFrameEncoding: """Tests for camera frame encoding utilities.""" def test_encode_frame_to_base64(self): """Frame encoding should produce valid base64 JPEG.""" from reachy_mini_conversation_app.robot_control_api import ( encode_frame_to_base64, ) # Create a simple test frame (100x100 RGB) test_frame = np.zeros((100, 100, 3), dtype=np.uint8) test_frame[:, :] = [255, 0, 0] # Red frame result = encode_frame_to_base64(test_frame, quality=70) # Should be valid base64 assert isinstance(result, str) decoded = base64.b64decode(result) # JPEG magic bytes assert decoded[:2] == b"\xff\xd8" def test_encode_frame_returns_none_for_invalid_frame(self): """Encoding invalid frame should return None, not crash.""" from reachy_mini_conversation_app.robot_control_api import ( encode_frame_to_base64, ) result = encode_frame_to_base64(None, quality=70) assert result is None class TestCameraStreamingLoop: """Tests for the camera streaming background task.""" @pytest.mark.asyncio async def test_streaming_only_when_clients_connected(self): """Should not emit frames when no WebSocket clients are connected.""" from reachy_mini_conversation_app.robot_control_api import ( should_stream_frame, set_streaming_enabled, ) from reachy_mini_conversation_app.stream_api import ConnectionManager # Enable streaming set_streaming_enabled(True) # Mock client_count property to return 0 with patch.object( ConnectionManager, "client_count", new_callable=PropertyMock, return_value=0 ): assert should_stream_frame() is False @pytest.mark.asyncio async def test_streaming_when_clients_connected(self): """Should emit frames when WebSocket clients are connected.""" from reachy_mini_conversation_app.robot_control_api import ( should_stream_frame, set_streaming_enabled, ) from reachy_mini_conversation_app.stream_api import ConnectionManager # Enable streaming set_streaming_enabled(True) # Mock client_count property to return 1 with patch.object( ConnectionManager, "client_count", new_callable=PropertyMock, return_value=1 ): assert should_stream_frame() is True @pytest.mark.asyncio async def test_streaming_disabled_even_with_clients(self): """Should not stream when streaming is disabled, even with clients.""" from reachy_mini_conversation_app.robot_control_api import ( should_stream_frame, set_streaming_enabled, ) from reachy_mini_conversation_app.stream_api import ConnectionManager # Disable streaming set_streaming_enabled(False) # Mock client_count property to return 1 with patch.object( ConnectionManager, "client_count", new_callable=PropertyMock, return_value=1 ): assert should_stream_frame() is False