Spaces:
Running
Running
| import asyncio | |
| import logging | |
| from typing import Any | |
| from datetime import datetime, timezone | |
| from unittest.mock import MagicMock | |
| import pytest | |
| import reachy_mini_conversation_app.openai_realtime as rt_mod | |
| from reachy_mini_conversation_app.openai_realtime import OpenaiRealtimeHandler | |
| from reachy_mini_conversation_app.tools.core_tools import ToolDependencies | |
| def _build_handler(loop: asyncio.AbstractEventLoop) -> OpenaiRealtimeHandler: | |
| asyncio.set_event_loop(loop) | |
| deps = ToolDependencies(reachy_mini=MagicMock(), movement_manager=MagicMock()) | |
| return OpenaiRealtimeHandler(deps) | |
| def test_format_timestamp_uses_wall_clock() -> None: | |
| """Test that format_timestamp uses wall clock time.""" | |
| loop = asyncio.new_event_loop() | |
| try: | |
| print("Testing format_timestamp...") | |
| handler = _build_handler(loop) | |
| formatted = handler.format_timestamp() | |
| print(f"Formatted timestamp: {formatted}") | |
| finally: | |
| asyncio.set_event_loop(None) | |
| loop.close() | |
| # Extract year from "[YYYY-MM-DD ...]" | |
| year = int(formatted[1:5]) | |
| assert year == datetime.now(timezone.utc).year | |
| async def test_start_up_retries_on_abrupt_close(monkeypatch: Any, caplog: Any) -> None: | |
| """First connection dies with ConnectionClosedError during iteration -> retried. | |
| Second connection iterates cleanly (no events) -> start_up returns without raising. | |
| Ensures handler clears self.connection at the end. | |
| """ | |
| caplog.set_level(logging.WARNING) | |
| # Use a local Exception as the module's ConnectionClosedError to avoid ws dependency | |
| FakeCCE = type("FakeCCE", (Exception,), {}) | |
| monkeypatch.setattr(rt_mod, "ConnectionClosedError", FakeCCE) | |
| # Make asyncio.sleep return immediately (for backoff) | |
| async def _fast_sleep(*_a: Any, **_kw: Any) -> None: return None | |
| monkeypatch.setattr(asyncio, "sleep", _fast_sleep, raising=False) | |
| attempt_counter = {"n": 0} | |
| class FakeConn: | |
| """Minimal realtime connection stub.""" | |
| def __init__(self, mode: str): | |
| self._mode = mode | |
| class _Session: | |
| async def update(self, **_kw: Any) -> None: return None | |
| self.session = _Session() | |
| class _InputAudioBuffer: | |
| async def append(self, **_kw: Any) -> None: return None | |
| self.input_audio_buffer = _InputAudioBuffer() | |
| class _Item: | |
| async def create(self, **_kw: Any) -> None: return None | |
| class _Conversation: | |
| item = _Item() | |
| self.conversation = _Conversation() | |
| class _Response: | |
| async def create(self, **_kw: Any) -> None: return None | |
| async def cancel(self, **_kw: Any) -> None: return None | |
| self.response = _Response() | |
| async def __aenter__(self) -> "FakeConn": return self | |
| async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> bool: return False | |
| async def close(self) -> None: return None | |
| # Async iterator protocol | |
| def __aiter__(self) -> "FakeConn": return self | |
| async def __anext__(self) -> None: | |
| if self._mode == "raise_on_iter": | |
| raise FakeCCE("abrupt close (simulated)") | |
| raise StopAsyncIteration # clean exit (no events) | |
| class FakeRealtime: | |
| def connect(self, **_kw: Any) -> FakeConn: | |
| attempt_counter["n"] += 1 | |
| mode = "raise_on_iter" if attempt_counter["n"] == 1 else "clean" | |
| return FakeConn(mode) | |
| class FakeClient: | |
| def __init__(self, **_kw: Any) -> None: self.realtime = FakeRealtime() | |
| # Patch the OpenAI client used by the handler | |
| monkeypatch.setattr(rt_mod, "AsyncOpenAI", FakeClient) | |
| # Build handler with minimal deps | |
| deps = ToolDependencies(reachy_mini=MagicMock(), movement_manager=MagicMock()) | |
| handler = rt_mod.OpenaiRealtimeHandler(deps) | |
| # Run: should retry once and exit cleanly | |
| await handler.start_up() | |
| # Validate: two attempts total (fail -> retry -> succeed), and connection cleared | |
| assert attempt_counter["n"] == 2 | |
| assert handler.connection is None | |
| # Optional: confirm we logged the unexpected close once | |
| warnings = [r for r in caplog.records if r.levelname == "WARNING" and "closed unexpectedly" in r.msg] | |
| assert len(warnings) == 1 | |