| import asyncio |
| import logging |
| from typing import Any |
| from datetime import datetime, timezone |
| from unittest.mock import MagicMock |
|
|
| import pytest |
|
|
| import anima_conversation_app.openai_realtime as rt_mod |
| from anima_conversation_app.openai_realtime import OpenaiRealtimeHandler, _compute_response_cost |
| from anima_conversation_app.tools.core_tools import ToolDependencies |
|
|
|
|
| def _build_handler(loop: asyncio.AbstractEventLoop) -> OpenaiRealtimeHandler: |
| asyncio.set_event_loop(loop) |
| deps = ToolDependencies(reachy_mini=MagicMock(), movement_manager=MagicMock()) |
| return OpenaiRealtimeHandler(deps) |
|
|
|
|
| def test_format_timestamp_uses_wall_clock() -> None: |
| """Test that format_timestamp uses wall clock time.""" |
| loop = asyncio.new_event_loop() |
| try: |
| print("Testing format_timestamp...") |
| handler = _build_handler(loop) |
| formatted = handler.format_timestamp() |
| print(f"Formatted timestamp: {formatted}") |
| finally: |
| asyncio.set_event_loop(None) |
| loop.close() |
|
|
| |
| year = int(formatted[1:5]) |
| assert year == datetime.now(timezone.utc).year |
|
|
| @pytest.mark.asyncio |
| async def test_start_up_retries_on_abrupt_close(monkeypatch: Any, caplog: Any) -> None: |
| """First connection dies with ConnectionClosedError during iteration -> retried. |
| |
| Second connection iterates cleanly (no events) -> start_up returns without raising. |
| Ensures handler clears self.connection at the end. |
| """ |
| caplog.set_level(logging.WARNING) |
|
|
| |
| FakeCCE = type("FakeCCE", (Exception,), {}) |
| monkeypatch.setattr(rt_mod, "ConnectionClosedError", FakeCCE) |
|
|
| |
| async def _fast_sleep(*_a: Any, **_kw: Any) -> None: return None |
| monkeypatch.setattr(asyncio, "sleep", _fast_sleep, raising=False) |
|
|
| attempt_counter = {"n": 0} |
|
|
| class FakeConn: |
| """Minimal realtime connection stub.""" |
|
|
| def __init__(self, mode: str): |
| self._mode = mode |
|
|
| class _Session: |
| async def update(self, **_kw: Any) -> None: return None |
| self.session = _Session() |
|
|
| class _InputAudioBuffer: |
| async def append(self, **_kw: Any) -> None: return None |
| self.input_audio_buffer = _InputAudioBuffer() |
|
|
| class _Item: |
| async def create(self, **_kw: Any) -> None: return None |
|
|
| class _Conversation: |
| item = _Item() |
| self.conversation = _Conversation() |
|
|
| class _Response: |
| async def create(self, **_kw: Any) -> None: return None |
| async def cancel(self, **_kw: Any) -> None: return None |
| self.response = _Response() |
|
|
| async def __aenter__(self) -> "FakeConn": return self |
| async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> bool: return False |
| async def close(self) -> None: return None |
|
|
| |
| def __aiter__(self) -> "FakeConn": return self |
| async def __anext__(self) -> None: |
| if self._mode == "raise_on_iter": |
| raise FakeCCE("abrupt close (simulated)") |
| raise StopAsyncIteration |
|
|
| class FakeRealtime: |
| def connect(self, **_kw: Any) -> FakeConn: |
| attempt_counter["n"] += 1 |
| mode = "raise_on_iter" if attempt_counter["n"] == 1 else "clean" |
| return FakeConn(mode) |
|
|
| class FakeClient: |
| def __init__(self, **_kw: Any) -> None: self.realtime = FakeRealtime() |
|
|
| |
| monkeypatch.setattr(rt_mod, "AsyncOpenAI", FakeClient) |
|
|
| |
| deps = ToolDependencies(reachy_mini=MagicMock(), movement_manager=MagicMock()) |
| handler = rt_mod.OpenaiRealtimeHandler(deps) |
|
|
| |
| await handler.start_up() |
|
|
| |
| assert attempt_counter["n"] == 2 |
| assert handler.connection is None |
|
|
| |
| warnings = [r for r in caplog.records if r.levelname == "WARNING" and "closed unexpectedly" in r.msg] |
| assert len(warnings) == 1 |
|
|
|
|
| |
|
|
|
|
| def _make_usage( |
| audio_in: int | None = 0, |
| text_in: int | None = 0, |
| image_in: int | None = 0, |
| audio_out: int | None = 0, |
| text_out: int | None = 0, |
| has_input: bool = True, |
| has_output: bool = True, |
| ) -> MagicMock: |
| """Build a fake usage object matching the OpenAI response.usage shape.""" |
| usage = MagicMock() |
| if has_input: |
| inp = MagicMock() |
| inp.audio_tokens = audio_in |
| inp.text_tokens = text_in |
| inp.image_tokens = image_in |
| usage.input_token_details = inp |
| else: |
| usage.input_token_details = None |
| if has_output: |
| out = MagicMock() |
| out.audio_tokens = audio_out |
| out.text_tokens = text_out |
| usage.output_token_details = out |
| else: |
| usage.output_token_details = None |
| return usage |
|
|
|
|
| @pytest.mark.parametrize( |
| "usage_kwargs, expect_positive", |
| [ |
| |
| ({"audio_in": 1000, "text_in": 2000, "image_in": 500, "audio_out": 800, "text_out": 300}, True), |
| |
| ({"audio_in": None, "text_in": None, "image_in": None, "audio_out": None, "text_out": None}, False), |
| |
| ({"audio_in": None, "text_in": 500, "image_in": None, "audio_out": 1000, "text_out": None}, True), |
| |
| ({"has_input": False, "has_output": False}, False), |
| ], |
| ids=["normal", "all_none", "mixed", "missing_details"], |
| ) |
| def test_compute_response_cost(usage_kwargs: dict[str, Any], expect_positive: bool) -> None: |
| """Verify _compute_response_cost handles various token combinations without crashing.""" |
| usage = _make_usage(**usage_kwargs) |
| cost = _compute_response_cost(usage) |
| if expect_positive: |
| assert cost > 0 |
| else: |
| assert cost == 0.0 |
|
|