import io import os import time import warnings import wave import numpy as np import pytest from PIL import Image from app import LANGUAGES, iris_markup, mock_mode_enabled, resolve_question from utils import ( WAV_PREFIX, bytes_to_wav, cleanup_wav, image_to_bytes, prune_old_wavs, safe_call, ) class TestImageToBytes: def test_accepts_numpy(self): image = np.zeros((32, 32, 3), dtype=np.uint8) result = image_to_bytes(image) assert result.startswith(b"\xff\xd8") def test_accepts_pil_image(self): image = Image.new("RGB", (64, 64), color="red") result = image_to_bytes(image) assert result.startswith(b"\xff\xd8") def test_accepts_file_path(self, tmp_path): image = Image.new("RGB", (64, 64), color="blue") path = tmp_path / "test.jpg" image.save(path) result = image_to_bytes(str(path)) assert result.startswith(b"\xff\xd8") def test_accepts_path_object(self, tmp_path): image = Image.new("RGB", (64, 64), color="green") path = tmp_path / "test.jpg" image.save(path) result = image_to_bytes(path) assert result.startswith(b"\xff\xd8") def test_converts_rgba_to_rgb(self): image = Image.new("RGBA", (32, 32), color=(255, 0, 0, 128)) result = image_to_bytes(image) assert result.startswith(b"\xff\xd8") def test_output_is_valid_jpeg(self, tmp_path): image = Image.new("RGB", (100, 100), color="white") result = image_to_bytes(image) assert len(result) > 0 loaded = Image.open(io.BytesIO(result)) assert loaded.format == "JPEG" def test_raises_on_invalid_type(self): with pytest.raises(TypeError): image_to_bytes(12345) class TestBytesToWav: def test_writes_file(self, tmp_path): source = io.BytesIO() with wave.open(source, "wb") as wav_file: wav_file.setnchannels(1) wav_file.setsampwidth(2) wav_file.setframerate(8_000) wav_file.writeframes(b"\x00\x00" * 80) path = bytes_to_wav(source.getvalue()) with wave.open(path, "rb") as wav_file: assert wav_file.getframerate() == 8_000 def test_raises_on_empty_bytes(self): with pytest.raises(ValueError): bytes_to_wav(b"") def test_creates_valid_wav_file(self, tmp_path): source = io.BytesIO() with wave.open(source, "wb") as wav_file: wav_file.setnchannels(1) wav_file.setsampwidth(2) wav_file.setframerate(16_000) wav_file.writeframes(b"\x00\x00" * 160) path = bytes_to_wav(source.getvalue()) assert os.path.exists(path) with wave.open(path, "rb") as wav_file: assert wav_file.getnchannels() == 1 assert wav_file.getsampwidth() == 2 class TestCleanupWav: def test_removes_existing_file(self, tmp_path): source = io.BytesIO() with wave.open(source, "wb") as wav_file: wav_file.setnchannels(1) wav_file.setsampwidth(2) wav_file.setframerate(8_000) wav_file.writeframes(b"\x00\x00" * 80) path = bytes_to_wav(source.getvalue()) assert os.path.exists(path) cleanup_wav(path) assert not os.path.exists(path) def test_handles_none(self): cleanup_wav(None) def test_handles_nonexistent_file(self): cleanup_wav("/nonexistent/path/file.wav") class TestStitchOverlappingText: def _stitch(self, *args, **kwargs): from modal_backend import stitch_overlapping_text return stitch_overlapping_text(*args, **kwargs) def test_removes_overlap_case_insensitive(self): parts = [ "RIVER CAFE SOUP $6 VEGETABLE CURRY $14", "Vegetable Curry $14 Grilled Fish $18 Mango Lassi $5", ] assert self._stitch(parts) == ( "RIVER CAFE SOUP $6 VEGETABLE CURRY $14 Grilled Fish $18 Mango Lassi $5" ) def test_no_overlap_concatenates(self): assert self._stitch(["alpha beta", "gamma delta"]) == "alpha beta gamma delta" def test_single_part(self): assert self._stitch(["only text here"]) == "only text here" def test_empty_and_blank_parts_ignored(self): assert self._stitch(["", " ", "real text"]) == "real text" assert self._stitch([]) == "" def test_full_duplicate_collapses(self): assert self._stitch(["take one capsule", "take one capsule"]) == "take one capsule" class TestPruneOldWavs: def _make_wav_bytes(self) -> bytes: source = io.BytesIO() with wave.open(source, "wb") as wav_file: wav_file.setnchannels(1) wav_file.setsampwidth(2) wav_file.setframerate(8_000) wav_file.writeframes(b"\x00\x00" * 80) return source.getvalue() def test_uses_recognizable_prefix(self): path = bytes_to_wav(self._make_wav_bytes()) try: assert WAV_PREFIX in os.path.basename(path) finally: cleanup_wav(path) def test_removes_stale_files(self): path = bytes_to_wav(self._make_wav_bytes()) old = time.time() - 10_000 os.utime(path, (old, old)) removed = prune_old_wavs(max_age_seconds=600) assert removed >= 1 assert not os.path.exists(path) def test_keeps_fresh_files(self): path = bytes_to_wav(self._make_wav_bytes()) try: prune_old_wavs(max_age_seconds=600) assert os.path.exists(path) finally: cleanup_wav(path) class TestSafeCall: def test_returns_fallback(self): def fail(): raise RuntimeError("expected") with warnings.catch_warnings(): warnings.simplefilter("ignore", UserWarning) assert safe_call(fail, fallback="fallback") == "fallback" def test_returns_success(self): def succeed(): return "ok" assert safe_call(succeed) == "ok" def test_passes_args_and_kwargs(self): def add(a, b, extra=0): return a + b + extra assert safe_call(add, 1, 2, extra=3) == 6 class TestIrisMarkup: def test_exposes_state_label(self): markup = iris_markup("seeing", "Seeing") assert "iris seeing" in markup assert "Seeing" in markup def test_idle_state(self): markup = iris_markup("idle", "Ready") assert "iris idle" in markup def test_speaking_state(self): markup = iris_markup("speaking", "Speaking") assert "iris speaking" in markup class TestLanguages: def test_english(self): assert LANGUAGES["English"] == "en" def test_chinese(self): assert LANGUAGES["Chinese"] == "zh" class TestMockMode: def test_can_be_forced(self, monkeypatch): monkeypatch.setenv("THIRD_EYE_MOCK", "true") assert mock_mode_enabled() is True def test_can_be_disabled(self, monkeypatch): monkeypatch.setenv("THIRD_EYE_MOCK", "false") monkeypatch.setenv("MODAL_TOKEN_ID", "fake") monkeypatch.setenv("MODAL_TOKEN_SECRET", "fake") assert mock_mode_enabled() is False def test_auto_enables_without_tokens(self, monkeypatch): monkeypatch.delenv("MODAL_TOKEN_ID", raising=False) monkeypatch.delenv("MODAL_TOKEN_SECRET", raising=False) monkeypatch.setenv("THIRD_EYE_MOCK", "auto") assert mock_mode_enabled() is True class TestResolveQuestion: def test_read_text_mode(self): result = resolve_question("Read Text", None, "", "en") assert "Read every word" in result assert "verbatim" in result def test_describe_mode(self): result = resolve_question("Describe", None, "", "en") assert "Describe" in result assert "blind" in result def test_ask_with_typed_question(self): result = resolve_question("Ask", None, "What color is this?", "en") assert result == "What color is this?" def test_ask_without_input(self): result = resolve_question("Ask", None, "", "en") assert "image" in result.lower() def test_typed_question_overrides_audio(self): result = resolve_question("Ask", "/some/audio.wav", "typed question", "en") assert result == "typed question"