reachy-mini-chat / tests /test_smoke_flows.py
itseffi's picture
v1
def8c60
import asyncio
from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
import reachy_mini_chat.openai_realtime as rt_mod
from reachy_mini_chat.main import update_chatbot
from reachy_mini_chat.openai_realtime import OpenaiRealtimeHandler
from reachy_mini_chat.tools.core_tools import ToolDependencies
def _build_handler() -> OpenaiRealtimeHandler:
deps = ToolDependencies(reachy_mini=MagicMock(), movement_manager=MagicMock())
return OpenaiRealtimeHandler(deps)
class _FakeSession:
async def update(self, **_kwargs) -> None:
return None
class _FakeItemApi:
def __init__(self) -> None:
self.created: list[dict] = []
async def create(self, **kwargs) -> None:
self.created.append(kwargs)
class _FakeConversation:
def __init__(self) -> None:
self.item = _FakeItemApi()
class _FakeResponse:
def __init__(self) -> None:
self.created: list[dict] = []
async def create(self, **kwargs) -> None:
self.created.append(kwargs)
async def cancel(self, **_kwargs) -> None:
return None
class _FakeOutputAudioBuffer:
async def clear(self) -> None:
return None
class _FakeInputAudioBuffer:
async def append(self, **_kwargs) -> None:
return None
class _FakeConn:
def __init__(self, events: list[SimpleNamespace]) -> None:
self._events = events
self._idx = 0
self.session = _FakeSession()
self.conversation = _FakeConversation()
self.response = _FakeResponse()
self.output_audio_buffer = _FakeOutputAudioBuffer()
self.input_audio_buffer = _FakeInputAudioBuffer()
async def __aenter__(self) -> "_FakeConn":
return self
async def __aexit__(self, exc_type, exc, tb) -> bool:
return False
def __aiter__(self) -> "_FakeConn":
return self
async def __anext__(self) -> SimpleNamespace:
if self._idx >= len(self._events):
raise StopAsyncIteration
ev = self._events[self._idx]
self._idx += 1
return ev
async def close(self) -> None:
return None
class _FakeRealtime:
def __init__(self, conn: _FakeConn) -> None:
self._conn = conn
def connect(self, **_kwargs) -> _FakeConn:
return self._conn
class _FakeClient:
def __init__(self, conn: _FakeConn) -> None:
self.realtime = _FakeRealtime(conn)
def test_smoke_typed_chat_updates_transcript() -> None:
chat: list[dict] = []
updated_chat, transcript = update_chatbot(chat, {"role": "user", "content": "hello"})
assert updated_chat[-1]["content"] == "hello"
assert "hello" in transcript
assert "user" in transcript
@pytest.mark.asyncio
async def test_smoke_voice_transcript_event_path(monkeypatch) -> None:
handler = _build_handler()
monkeypatch.setattr(rt_mod, "AdditionalOutputs", lambda payload: payload)
monkeypatch.setattr(rt_mod, "get_session_instructions", lambda: "test instructions")
monkeypatch.setattr(rt_mod, "get_tool_specs", lambda: [])
monkeypatch.setattr(handler, "_speak_edge_tts", lambda _text: asyncio.sleep(0))
event = SimpleNamespace(type="conversation.item.input_audio_transcription.completed", transcript="hello there")
conn = _FakeConn([event])
handler.client = _FakeClient(conn)
await handler._run_realtime_session()
item = await asyncio.wait_for(handler.output_queue.get(), timeout=1.0)
assert item["role"] == "user"
assert item["content"] == "hello there"
@pytest.mark.asyncio
async def test_smoke_connect_disconnect(monkeypatch) -> None:
handler = _build_handler()
called = {"startup": 0}
async def _fake_startup() -> None:
called["startup"] += 1
handler.connection = object()
handler._connected_event.set()
monkeypatch.setattr(handler, "start_up", _fake_startup)
ok = await handler._ensure_connected(timeout_s=0.2)
assert ok is True
assert called["startup"] == 1
class _Closable:
def __init__(self) -> None:
self.closed = False
async def close(self) -> None:
self.closed = True
closable = _Closable()
handler.connection = closable
await handler.output_queue.put({"dummy": True})
await handler.shutdown()
assert closable.closed is True
assert handler.connection is None
assert handler.output_queue.empty()
@pytest.mark.asyncio
async def test_smoke_tool_call_gating_blocks_unrequested_dance(monkeypatch) -> None:
handler = _build_handler()
monkeypatch.setattr(rt_mod, "AdditionalOutputs", lambda payload: payload)
monkeypatch.setattr(rt_mod, "get_session_instructions", lambda: "test instructions")
monkeypatch.setattr(rt_mod, "get_tool_specs", lambda: [])
async def _dispatch_should_not_run(*_args, **_kwargs):
raise AssertionError("dispatch_tool_call should not run for blocked dance call")
monkeypatch.setattr(rt_mod, "dispatch_tool_call", _dispatch_should_not_run)
event = SimpleNamespace(type="response.function_call_arguments.done", name="dance", arguments="{}", call_id="c1")
conn = _FakeConn([event])
handler.client = _FakeClient(conn)
await handler._run_realtime_session()
item = await asyncio.wait_for(handler.output_queue.get(), timeout=1.0)
assert "Blocked tool dance" in item["metadata"]["title"]
assert "requires explicit user request" in item["content"]