Spaces:
Sleeping
Sleeping
| import asyncio | |
| from types import SimpleNamespace | |
| from unittest.mock import MagicMock | |
| import pytest | |
| import reachy_mini_chat.openai_realtime as rt_mod | |
| from reachy_mini_chat.main import update_chatbot | |
| from reachy_mini_chat.openai_realtime import OpenaiRealtimeHandler | |
| from reachy_mini_chat.tools.core_tools import ToolDependencies | |
| def _build_handler() -> OpenaiRealtimeHandler: | |
| deps = ToolDependencies(reachy_mini=MagicMock(), movement_manager=MagicMock()) | |
| return OpenaiRealtimeHandler(deps) | |
| class _FakeSession: | |
| async def update(self, **_kwargs) -> None: | |
| return None | |
| class _FakeItemApi: | |
| def __init__(self) -> None: | |
| self.created: list[dict] = [] | |
| async def create(self, **kwargs) -> None: | |
| self.created.append(kwargs) | |
| class _FakeConversation: | |
| def __init__(self) -> None: | |
| self.item = _FakeItemApi() | |
| class _FakeResponse: | |
| def __init__(self) -> None: | |
| self.created: list[dict] = [] | |
| async def create(self, **kwargs) -> None: | |
| self.created.append(kwargs) | |
| async def cancel(self, **_kwargs) -> None: | |
| return None | |
| class _FakeOutputAudioBuffer: | |
| async def clear(self) -> None: | |
| return None | |
| class _FakeInputAudioBuffer: | |
| async def append(self, **_kwargs) -> None: | |
| return None | |
| class _FakeConn: | |
| def __init__(self, events: list[SimpleNamespace]) -> None: | |
| self._events = events | |
| self._idx = 0 | |
| self.session = _FakeSession() | |
| self.conversation = _FakeConversation() | |
| self.response = _FakeResponse() | |
| self.output_audio_buffer = _FakeOutputAudioBuffer() | |
| self.input_audio_buffer = _FakeInputAudioBuffer() | |
| async def __aenter__(self) -> "_FakeConn": | |
| return self | |
| async def __aexit__(self, exc_type, exc, tb) -> bool: | |
| return False | |
| def __aiter__(self) -> "_FakeConn": | |
| return self | |
| async def __anext__(self) -> SimpleNamespace: | |
| if self._idx >= len(self._events): | |
| raise StopAsyncIteration | |
| ev = self._events[self._idx] | |
| self._idx += 1 | |
| return ev | |
| async def close(self) -> None: | |
| return None | |
| class _FakeRealtime: | |
| def __init__(self, conn: _FakeConn) -> None: | |
| self._conn = conn | |
| def connect(self, **_kwargs) -> _FakeConn: | |
| return self._conn | |
| class _FakeClient: | |
| def __init__(self, conn: _FakeConn) -> None: | |
| self.realtime = _FakeRealtime(conn) | |
| def test_smoke_typed_chat_updates_transcript() -> None: | |
| chat: list[dict] = [] | |
| updated_chat, transcript = update_chatbot(chat, {"role": "user", "content": "hello"}) | |
| assert updated_chat[-1]["content"] == "hello" | |
| assert "hello" in transcript | |
| assert "user" in transcript | |
| async def test_smoke_voice_transcript_event_path(monkeypatch) -> None: | |
| handler = _build_handler() | |
| monkeypatch.setattr(rt_mod, "AdditionalOutputs", lambda payload: payload) | |
| monkeypatch.setattr(rt_mod, "get_session_instructions", lambda: "test instructions") | |
| monkeypatch.setattr(rt_mod, "get_tool_specs", lambda: []) | |
| monkeypatch.setattr(handler, "_speak_edge_tts", lambda _text: asyncio.sleep(0)) | |
| event = SimpleNamespace(type="conversation.item.input_audio_transcription.completed", transcript="hello there") | |
| conn = _FakeConn([event]) | |
| handler.client = _FakeClient(conn) | |
| await handler._run_realtime_session() | |
| item = await asyncio.wait_for(handler.output_queue.get(), timeout=1.0) | |
| assert item["role"] == "user" | |
| assert item["content"] == "hello there" | |
| async def test_smoke_connect_disconnect(monkeypatch) -> None: | |
| handler = _build_handler() | |
| called = {"startup": 0} | |
| async def _fake_startup() -> None: | |
| called["startup"] += 1 | |
| handler.connection = object() | |
| handler._connected_event.set() | |
| monkeypatch.setattr(handler, "start_up", _fake_startup) | |
| ok = await handler._ensure_connected(timeout_s=0.2) | |
| assert ok is True | |
| assert called["startup"] == 1 | |
| class _Closable: | |
| def __init__(self) -> None: | |
| self.closed = False | |
| async def close(self) -> None: | |
| self.closed = True | |
| closable = _Closable() | |
| handler.connection = closable | |
| await handler.output_queue.put({"dummy": True}) | |
| await handler.shutdown() | |
| assert closable.closed is True | |
| assert handler.connection is None | |
| assert handler.output_queue.empty() | |
| async def test_smoke_tool_call_gating_blocks_unrequested_dance(monkeypatch) -> None: | |
| handler = _build_handler() | |
| monkeypatch.setattr(rt_mod, "AdditionalOutputs", lambda payload: payload) | |
| monkeypatch.setattr(rt_mod, "get_session_instructions", lambda: "test instructions") | |
| monkeypatch.setattr(rt_mod, "get_tool_specs", lambda: []) | |
| async def _dispatch_should_not_run(*_args, **_kwargs): | |
| raise AssertionError("dispatch_tool_call should not run for blocked dance call") | |
| monkeypatch.setattr(rt_mod, "dispatch_tool_call", _dispatch_should_not_run) | |
| event = SimpleNamespace(type="response.function_call_arguments.done", name="dance", arguments="{}", call_id="c1") | |
| conn = _FakeConn([event]) | |
| handler.client = _FakeClient(conn) | |
| await handler._run_realtime_session() | |
| item = await asyncio.wait_for(handler.output_queue.get(), timeout=1.0) | |
| assert "Blocked tool dance" in item["metadata"]["title"] | |
| assert "requires explicit user request" in item["content"] | |