anima_conversation_app / tests /test_openai_realtime.py
danielritchie's picture
Sync from GitHub via hub-sync
621c9ef verified
import asyncio
import logging
from typing import Any
from datetime import datetime, timezone
from unittest.mock import MagicMock
import pytest
import anima_conversation_app.openai_realtime as rt_mod
from anima_conversation_app.openai_realtime import OpenaiRealtimeHandler, _compute_response_cost
from anima_conversation_app.tools.core_tools import ToolDependencies
def _build_handler(loop: asyncio.AbstractEventLoop) -> OpenaiRealtimeHandler:
asyncio.set_event_loop(loop)
deps = ToolDependencies(reachy_mini=MagicMock(), movement_manager=MagicMock())
return OpenaiRealtimeHandler(deps)
def test_format_timestamp_uses_wall_clock() -> None:
"""Test that format_timestamp uses wall clock time."""
loop = asyncio.new_event_loop()
try:
print("Testing format_timestamp...")
handler = _build_handler(loop)
formatted = handler.format_timestamp()
print(f"Formatted timestamp: {formatted}")
finally:
asyncio.set_event_loop(None)
loop.close()
# Extract year from "[YYYY-MM-DD ...]"
year = int(formatted[1:5])
assert year == datetime.now(timezone.utc).year
@pytest.mark.asyncio
async def test_start_up_retries_on_abrupt_close(monkeypatch: Any, caplog: Any) -> None:
"""First connection dies with ConnectionClosedError during iteration -> retried.
Second connection iterates cleanly (no events) -> start_up returns without raising.
Ensures handler clears self.connection at the end.
"""
caplog.set_level(logging.WARNING)
# Use a local Exception as the module's ConnectionClosedError to avoid ws dependency
FakeCCE = type("FakeCCE", (Exception,), {})
monkeypatch.setattr(rt_mod, "ConnectionClosedError", FakeCCE)
# Make asyncio.sleep return immediately (for backoff)
async def _fast_sleep(*_a: Any, **_kw: Any) -> None: return None
monkeypatch.setattr(asyncio, "sleep", _fast_sleep, raising=False)
attempt_counter = {"n": 0}
class FakeConn:
"""Minimal realtime connection stub."""
def __init__(self, mode: str):
self._mode = mode
class _Session:
async def update(self, **_kw: Any) -> None: return None
self.session = _Session()
class _InputAudioBuffer:
async def append(self, **_kw: Any) -> None: return None
self.input_audio_buffer = _InputAudioBuffer()
class _Item:
async def create(self, **_kw: Any) -> None: return None
class _Conversation:
item = _Item()
self.conversation = _Conversation()
class _Response:
async def create(self, **_kw: Any) -> None: return None
async def cancel(self, **_kw: Any) -> None: return None
self.response = _Response()
async def __aenter__(self) -> "FakeConn": return self
async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> bool: return False
async def close(self) -> None: return None
# Async iterator protocol
def __aiter__(self) -> "FakeConn": return self
async def __anext__(self) -> None:
if self._mode == "raise_on_iter":
raise FakeCCE("abrupt close (simulated)")
raise StopAsyncIteration # clean exit (no events)
class FakeRealtime:
def connect(self, **_kw: Any) -> FakeConn:
attempt_counter["n"] += 1
mode = "raise_on_iter" if attempt_counter["n"] == 1 else "clean"
return FakeConn(mode)
class FakeClient:
def __init__(self, **_kw: Any) -> None: self.realtime = FakeRealtime()
# Patch the OpenAI client used by the handler
monkeypatch.setattr(rt_mod, "AsyncOpenAI", FakeClient)
# Build handler with minimal deps
deps = ToolDependencies(reachy_mini=MagicMock(), movement_manager=MagicMock())
handler = rt_mod.OpenaiRealtimeHandler(deps)
# Run: should retry once and exit cleanly
await handler.start_up()
# Validate: two attempts total (fail -> retry -> succeed), and connection cleared
assert attempt_counter["n"] == 2
assert handler.connection is None
# Optional: confirm we logged the unexpected close once
warnings = [r for r in caplog.records if r.levelname == "WARNING" and "closed unexpectedly" in r.msg]
assert len(warnings) == 1
# ---- Cost calculation tests ----
def _make_usage(
audio_in: int | None = 0,
text_in: int | None = 0,
image_in: int | None = 0,
audio_out: int | None = 0,
text_out: int | None = 0,
has_input: bool = True,
has_output: bool = True,
) -> MagicMock:
"""Build a fake usage object matching the OpenAI response.usage shape."""
usage = MagicMock()
if has_input:
inp = MagicMock()
inp.audio_tokens = audio_in
inp.text_tokens = text_in
inp.image_tokens = image_in
usage.input_token_details = inp
else:
usage.input_token_details = None
if has_output:
out = MagicMock()
out.audio_tokens = audio_out
out.text_tokens = text_out
usage.output_token_details = out
else:
usage.output_token_details = None
return usage
@pytest.mark.parametrize(
"usage_kwargs, expect_positive",
[
# All token types present → positive cost
({"audio_in": 1000, "text_in": 2000, "image_in": 500, "audio_out": 800, "text_out": 300}, True),
# All None tokens → must not crash
({"audio_in": None, "text_in": None, "image_in": None, "audio_out": None, "text_out": None}, False),
# Mix of None and valid ints
({"audio_in": None, "text_in": 500, "image_in": None, "audio_out": 1000, "text_out": None}, True),
# Missing input/output details entirely
({"has_input": False, "has_output": False}, False),
],
ids=["normal", "all_none", "mixed", "missing_details"],
)
def test_compute_response_cost(usage_kwargs: dict[str, Any], expect_positive: bool) -> None:
"""Verify _compute_response_cost handles various token combinations without crashing."""
usage = _make_usage(**usage_kwargs)
cost = _compute_response_cost(usage)
if expect_positive:
assert cost > 0
else:
assert cost == 0.0