reachy_mini_minder / tests /test_robot_control_api.py
Boopster's picture
initial commit
af9cde9
"""Tests for Robot Control API (TDD).
These tests verify the camera streaming and head control endpoints.
Written first following TDD methodology.
"""
import pytest
from unittest.mock import MagicMock, AsyncMock, patch, PropertyMock
import base64
import numpy as np
from fastapi.testclient import TestClient
from fastapi import FastAPI
class TestMoveHeadEndpoint:
"""Tests for POST /api/robot/move-head endpoint."""
@pytest.fixture
def mock_tool_deps(self):
"""Create mock tool dependencies with proper return values."""
deps = MagicMock()
# Mock reachy_mini methods
deps.reachy_mini.get_current_head_pose.return_value = MagicMock()
deps.reachy_mini.get_current_joint_positions.return_value = (
MagicMock(), # arm positions
(0.0, 0.0, 0.0), # antennas + body_yaw
)
# Mock movement_manager
deps.movement_manager.queue_move = MagicMock()
deps.movement_manager.set_moving_state = MagicMock()
return deps
@pytest.mark.asyncio
async def test_move_head_left(self, mock_tool_deps):
"""Moving head left should call move_head tool with direction='left'."""
from reachy_mini_conversation_app.robot_control_api import move_head_endpoint
result = await move_head_endpoint("left", mock_tool_deps)
assert result["status"] == "looking left"
@pytest.mark.asyncio
async def test_move_head_right(self, mock_tool_deps):
"""Moving head right should return success status."""
from reachy_mini_conversation_app.robot_control_api import move_head_endpoint
result = await move_head_endpoint("right", mock_tool_deps)
assert "looking right" in result["status"]
@pytest.mark.asyncio
async def test_move_head_up(self, mock_tool_deps):
"""Moving head up should return success status."""
from reachy_mini_conversation_app.robot_control_api import move_head_endpoint
result = await move_head_endpoint("up", mock_tool_deps)
assert "looking up" in result["status"]
@pytest.mark.asyncio
async def test_move_head_down(self, mock_tool_deps):
"""Moving head down should return success status."""
from reachy_mini_conversation_app.robot_control_api import move_head_endpoint
result = await move_head_endpoint("down", mock_tool_deps)
assert "looking down" in result["status"]
@pytest.mark.asyncio
async def test_move_head_front(self, mock_tool_deps):
"""Moving head to front (center) should return success status."""
from reachy_mini_conversation_app.robot_control_api import move_head_endpoint
result = await move_head_endpoint("front", mock_tool_deps)
assert "looking front" in result["status"]
@pytest.mark.asyncio
async def test_move_head_invalid_direction(self, mock_tool_deps):
"""Invalid direction should return error."""
from reachy_mini_conversation_app.robot_control_api import move_head_endpoint
result = await move_head_endpoint("backwards", mock_tool_deps)
assert "error" in result
class TestCameraStreamingStatus:
"""Tests for camera streaming status endpoint."""
@pytest.mark.asyncio
async def test_camera_status_returns_streaming_state(self):
"""Camera status should report whether streaming is active."""
from reachy_mini_conversation_app.robot_control_api import get_camera_status
result = await get_camera_status()
assert "streaming" in result
assert isinstance(result["streaming"], bool)
@pytest.mark.asyncio
async def test_camera_status_returns_fps(self):
"""Camera status should report target FPS."""
from reachy_mini_conversation_app.robot_control_api import get_camera_status
result = await get_camera_status()
assert "target_fps" in result
assert result["target_fps"] == 8 # Our design decision
class TestCameraFrameEncoding:
"""Tests for camera frame encoding utilities."""
def test_encode_frame_to_base64(self):
"""Frame encoding should produce valid base64 JPEG."""
from reachy_mini_conversation_app.robot_control_api import (
encode_frame_to_base64,
)
# Create a simple test frame (100x100 RGB)
test_frame = np.zeros((100, 100, 3), dtype=np.uint8)
test_frame[:, :] = [255, 0, 0] # Red frame
result = encode_frame_to_base64(test_frame, quality=70)
# Should be valid base64
assert isinstance(result, str)
decoded = base64.b64decode(result)
# JPEG magic bytes
assert decoded[:2] == b"\xff\xd8"
def test_encode_frame_returns_none_for_invalid_frame(self):
"""Encoding invalid frame should return None, not crash."""
from reachy_mini_conversation_app.robot_control_api import (
encode_frame_to_base64,
)
result = encode_frame_to_base64(None, quality=70)
assert result is None
class TestCameraStreamingLoop:
"""Tests for the camera streaming background task."""
@pytest.mark.asyncio
async def test_streaming_only_when_clients_connected(self):
"""Should not emit frames when no WebSocket clients are connected."""
from reachy_mini_conversation_app.robot_control_api import (
should_stream_frame,
set_streaming_enabled,
)
from reachy_mini_conversation_app.stream_api import ConnectionManager
# Enable streaming
set_streaming_enabled(True)
# Mock client_count property to return 0
with patch.object(
ConnectionManager, "client_count", new_callable=PropertyMock, return_value=0
):
assert should_stream_frame() is False
@pytest.mark.asyncio
async def test_streaming_when_clients_connected(self):
"""Should emit frames when WebSocket clients are connected."""
from reachy_mini_conversation_app.robot_control_api import (
should_stream_frame,
set_streaming_enabled,
)
from reachy_mini_conversation_app.stream_api import ConnectionManager
# Enable streaming
set_streaming_enabled(True)
# Mock client_count property to return 1
with patch.object(
ConnectionManager, "client_count", new_callable=PropertyMock, return_value=1
):
assert should_stream_frame() is True
@pytest.mark.asyncio
async def test_streaming_disabled_even_with_clients(self):
"""Should not stream when streaming is disabled, even with clients."""
from reachy_mini_conversation_app.robot_control_api import (
should_stream_frame,
set_streaming_enabled,
)
from reachy_mini_conversation_app.stream_api import ConnectionManager
# Disable streaming
set_streaming_enabled(False)
# Mock client_count property to return 1
with patch.object(
ConnectionManager, "client_count", new_callable=PropertyMock, return_value=1
):
assert should_stream_frame() is False