Spaces:
Running
Running
File size: 7,159 Bytes
af9cde9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 | """Tests for Robot Control API (TDD).
These tests verify the camera streaming and head control endpoints.
Written first following TDD methodology.
"""
import pytest
from unittest.mock import MagicMock, AsyncMock, patch, PropertyMock
import base64
import numpy as np
from fastapi.testclient import TestClient
from fastapi import FastAPI
class TestMoveHeadEndpoint:
"""Tests for POST /api/robot/move-head endpoint."""
@pytest.fixture
def mock_tool_deps(self):
"""Create mock tool dependencies with proper return values."""
deps = MagicMock()
# Mock reachy_mini methods
deps.reachy_mini.get_current_head_pose.return_value = MagicMock()
deps.reachy_mini.get_current_joint_positions.return_value = (
MagicMock(), # arm positions
(0.0, 0.0, 0.0), # antennas + body_yaw
)
# Mock movement_manager
deps.movement_manager.queue_move = MagicMock()
deps.movement_manager.set_moving_state = MagicMock()
return deps
@pytest.mark.asyncio
async def test_move_head_left(self, mock_tool_deps):
"""Moving head left should call move_head tool with direction='left'."""
from reachy_mini_conversation_app.robot_control_api import move_head_endpoint
result = await move_head_endpoint("left", mock_tool_deps)
assert result["status"] == "looking left"
@pytest.mark.asyncio
async def test_move_head_right(self, mock_tool_deps):
"""Moving head right should return success status."""
from reachy_mini_conversation_app.robot_control_api import move_head_endpoint
result = await move_head_endpoint("right", mock_tool_deps)
assert "looking right" in result["status"]
@pytest.mark.asyncio
async def test_move_head_up(self, mock_tool_deps):
"""Moving head up should return success status."""
from reachy_mini_conversation_app.robot_control_api import move_head_endpoint
result = await move_head_endpoint("up", mock_tool_deps)
assert "looking up" in result["status"]
@pytest.mark.asyncio
async def test_move_head_down(self, mock_tool_deps):
"""Moving head down should return success status."""
from reachy_mini_conversation_app.robot_control_api import move_head_endpoint
result = await move_head_endpoint("down", mock_tool_deps)
assert "looking down" in result["status"]
@pytest.mark.asyncio
async def test_move_head_front(self, mock_tool_deps):
"""Moving head to front (center) should return success status."""
from reachy_mini_conversation_app.robot_control_api import move_head_endpoint
result = await move_head_endpoint("front", mock_tool_deps)
assert "looking front" in result["status"]
@pytest.mark.asyncio
async def test_move_head_invalid_direction(self, mock_tool_deps):
"""Invalid direction should return error."""
from reachy_mini_conversation_app.robot_control_api import move_head_endpoint
result = await move_head_endpoint("backwards", mock_tool_deps)
assert "error" in result
class TestCameraStreamingStatus:
"""Tests for camera streaming status endpoint."""
@pytest.mark.asyncio
async def test_camera_status_returns_streaming_state(self):
"""Camera status should report whether streaming is active."""
from reachy_mini_conversation_app.robot_control_api import get_camera_status
result = await get_camera_status()
assert "streaming" in result
assert isinstance(result["streaming"], bool)
@pytest.mark.asyncio
async def test_camera_status_returns_fps(self):
"""Camera status should report target FPS."""
from reachy_mini_conversation_app.robot_control_api import get_camera_status
result = await get_camera_status()
assert "target_fps" in result
assert result["target_fps"] == 8 # Our design decision
class TestCameraFrameEncoding:
"""Tests for camera frame encoding utilities."""
def test_encode_frame_to_base64(self):
"""Frame encoding should produce valid base64 JPEG."""
from reachy_mini_conversation_app.robot_control_api import (
encode_frame_to_base64,
)
# Create a simple test frame (100x100 RGB)
test_frame = np.zeros((100, 100, 3), dtype=np.uint8)
test_frame[:, :] = [255, 0, 0] # Red frame
result = encode_frame_to_base64(test_frame, quality=70)
# Should be valid base64
assert isinstance(result, str)
decoded = base64.b64decode(result)
# JPEG magic bytes
assert decoded[:2] == b"\xff\xd8"
def test_encode_frame_returns_none_for_invalid_frame(self):
"""Encoding invalid frame should return None, not crash."""
from reachy_mini_conversation_app.robot_control_api import (
encode_frame_to_base64,
)
result = encode_frame_to_base64(None, quality=70)
assert result is None
class TestCameraStreamingLoop:
"""Tests for the camera streaming background task."""
@pytest.mark.asyncio
async def test_streaming_only_when_clients_connected(self):
"""Should not emit frames when no WebSocket clients are connected."""
from reachy_mini_conversation_app.robot_control_api import (
should_stream_frame,
set_streaming_enabled,
)
from reachy_mini_conversation_app.stream_api import ConnectionManager
# Enable streaming
set_streaming_enabled(True)
# Mock client_count property to return 0
with patch.object(
ConnectionManager, "client_count", new_callable=PropertyMock, return_value=0
):
assert should_stream_frame() is False
@pytest.mark.asyncio
async def test_streaming_when_clients_connected(self):
"""Should emit frames when WebSocket clients are connected."""
from reachy_mini_conversation_app.robot_control_api import (
should_stream_frame,
set_streaming_enabled,
)
from reachy_mini_conversation_app.stream_api import ConnectionManager
# Enable streaming
set_streaming_enabled(True)
# Mock client_count property to return 1
with patch.object(
ConnectionManager, "client_count", new_callable=PropertyMock, return_value=1
):
assert should_stream_frame() is True
@pytest.mark.asyncio
async def test_streaming_disabled_even_with_clients(self):
"""Should not stream when streaming is disabled, even with clients."""
from reachy_mini_conversation_app.robot_control_api import (
should_stream_frame,
set_streaming_enabled,
)
from reachy_mini_conversation_app.stream_api import ConnectionManager
# Disable streaming
set_streaming_enabled(False)
# Mock client_count property to return 1
with patch.object(
ConnectionManager, "client_count", new_callable=PropertyMock, return_value=1
):
assert should_stream_frame() is False
|