Spaces:
Sleeping
Sleeping
| import io | |
| import os | |
| import time | |
| import warnings | |
| import wave | |
| import numpy as np | |
| import pytest | |
| from PIL import Image | |
| from app import LANGUAGES, iris_markup, mock_mode_enabled, resolve_question | |
| from utils import ( | |
| WAV_PREFIX, | |
| bytes_to_wav, | |
| cleanup_wav, | |
| image_to_bytes, | |
| prune_old_wavs, | |
| safe_call, | |
| ) | |
| class TestImageToBytes: | |
| def test_accepts_numpy(self): | |
| image = np.zeros((32, 32, 3), dtype=np.uint8) | |
| result = image_to_bytes(image) | |
| assert result.startswith(b"\xff\xd8") | |
| def test_accepts_pil_image(self): | |
| image = Image.new("RGB", (64, 64), color="red") | |
| result = image_to_bytes(image) | |
| assert result.startswith(b"\xff\xd8") | |
| def test_accepts_file_path(self, tmp_path): | |
| image = Image.new("RGB", (64, 64), color="blue") | |
| path = tmp_path / "test.jpg" | |
| image.save(path) | |
| result = image_to_bytes(str(path)) | |
| assert result.startswith(b"\xff\xd8") | |
| def test_accepts_path_object(self, tmp_path): | |
| image = Image.new("RGB", (64, 64), color="green") | |
| path = tmp_path / "test.jpg" | |
| image.save(path) | |
| result = image_to_bytes(path) | |
| assert result.startswith(b"\xff\xd8") | |
| def test_converts_rgba_to_rgb(self): | |
| image = Image.new("RGBA", (32, 32), color=(255, 0, 0, 128)) | |
| result = image_to_bytes(image) | |
| assert result.startswith(b"\xff\xd8") | |
| def test_output_is_valid_jpeg(self, tmp_path): | |
| image = Image.new("RGB", (100, 100), color="white") | |
| result = image_to_bytes(image) | |
| assert len(result) > 0 | |
| loaded = Image.open(io.BytesIO(result)) | |
| assert loaded.format == "JPEG" | |
| def test_raises_on_invalid_type(self): | |
| with pytest.raises(TypeError): | |
| image_to_bytes(12345) | |
| class TestBytesToWav: | |
| def test_writes_file(self, tmp_path): | |
| source = io.BytesIO() | |
| with wave.open(source, "wb") as wav_file: | |
| wav_file.setnchannels(1) | |
| wav_file.setsampwidth(2) | |
| wav_file.setframerate(8_000) | |
| wav_file.writeframes(b"\x00\x00" * 80) | |
| path = bytes_to_wav(source.getvalue()) | |
| with wave.open(path, "rb") as wav_file: | |
| assert wav_file.getframerate() == 8_000 | |
| def test_raises_on_empty_bytes(self): | |
| with pytest.raises(ValueError): | |
| bytes_to_wav(b"") | |
| def test_creates_valid_wav_file(self, tmp_path): | |
| source = io.BytesIO() | |
| with wave.open(source, "wb") as wav_file: | |
| wav_file.setnchannels(1) | |
| wav_file.setsampwidth(2) | |
| wav_file.setframerate(16_000) | |
| wav_file.writeframes(b"\x00\x00" * 160) | |
| path = bytes_to_wav(source.getvalue()) | |
| assert os.path.exists(path) | |
| with wave.open(path, "rb") as wav_file: | |
| assert wav_file.getnchannels() == 1 | |
| assert wav_file.getsampwidth() == 2 | |
| class TestCleanupWav: | |
| def test_removes_existing_file(self, tmp_path): | |
| source = io.BytesIO() | |
| with wave.open(source, "wb") as wav_file: | |
| wav_file.setnchannels(1) | |
| wav_file.setsampwidth(2) | |
| wav_file.setframerate(8_000) | |
| wav_file.writeframes(b"\x00\x00" * 80) | |
| path = bytes_to_wav(source.getvalue()) | |
| assert os.path.exists(path) | |
| cleanup_wav(path) | |
| assert not os.path.exists(path) | |
| def test_handles_none(self): | |
| cleanup_wav(None) | |
| def test_handles_nonexistent_file(self): | |
| cleanup_wav("/nonexistent/path/file.wav") | |
| class TestStitchOverlappingText: | |
| def _stitch(self, *args, **kwargs): | |
| from modal_backend import stitch_overlapping_text | |
| return stitch_overlapping_text(*args, **kwargs) | |
| def test_removes_overlap_case_insensitive(self): | |
| parts = [ | |
| "RIVER CAFE SOUP $6 VEGETABLE CURRY $14", | |
| "Vegetable Curry $14 Grilled Fish $18 Mango Lassi $5", | |
| ] | |
| assert self._stitch(parts) == ( | |
| "RIVER CAFE SOUP $6 VEGETABLE CURRY $14 Grilled Fish $18 Mango Lassi $5" | |
| ) | |
| def test_no_overlap_concatenates(self): | |
| assert self._stitch(["alpha beta", "gamma delta"]) == "alpha beta gamma delta" | |
| def test_single_part(self): | |
| assert self._stitch(["only text here"]) == "only text here" | |
| def test_empty_and_blank_parts_ignored(self): | |
| assert self._stitch(["", " ", "real text"]) == "real text" | |
| assert self._stitch([]) == "" | |
| def test_full_duplicate_collapses(self): | |
| assert self._stitch(["take one capsule", "take one capsule"]) == "take one capsule" | |
| class TestPruneOldWavs: | |
| def _make_wav_bytes(self) -> bytes: | |
| source = io.BytesIO() | |
| with wave.open(source, "wb") as wav_file: | |
| wav_file.setnchannels(1) | |
| wav_file.setsampwidth(2) | |
| wav_file.setframerate(8_000) | |
| wav_file.writeframes(b"\x00\x00" * 80) | |
| return source.getvalue() | |
| def test_uses_recognizable_prefix(self): | |
| path = bytes_to_wav(self._make_wav_bytes()) | |
| try: | |
| assert WAV_PREFIX in os.path.basename(path) | |
| finally: | |
| cleanup_wav(path) | |
| def test_removes_stale_files(self): | |
| path = bytes_to_wav(self._make_wav_bytes()) | |
| old = time.time() - 10_000 | |
| os.utime(path, (old, old)) | |
| removed = prune_old_wavs(max_age_seconds=600) | |
| assert removed >= 1 | |
| assert not os.path.exists(path) | |
| def test_keeps_fresh_files(self): | |
| path = bytes_to_wav(self._make_wav_bytes()) | |
| try: | |
| prune_old_wavs(max_age_seconds=600) | |
| assert os.path.exists(path) | |
| finally: | |
| cleanup_wav(path) | |
| class TestSafeCall: | |
| def test_returns_fallback(self): | |
| def fail(): | |
| raise RuntimeError("expected") | |
| with warnings.catch_warnings(): | |
| warnings.simplefilter("ignore", UserWarning) | |
| assert safe_call(fail, fallback="fallback") == "fallback" | |
| def test_returns_success(self): | |
| def succeed(): | |
| return "ok" | |
| assert safe_call(succeed) == "ok" | |
| def test_passes_args_and_kwargs(self): | |
| def add(a, b, extra=0): | |
| return a + b + extra | |
| assert safe_call(add, 1, 2, extra=3) == 6 | |
| class TestIrisMarkup: | |
| def test_exposes_state_label(self): | |
| markup = iris_markup("seeing", "Seeing") | |
| assert "iris seeing" in markup | |
| assert "Seeing" in markup | |
| def test_idle_state(self): | |
| markup = iris_markup("idle", "Ready") | |
| assert "iris idle" in markup | |
| def test_speaking_state(self): | |
| markup = iris_markup("speaking", "Speaking") | |
| assert "iris speaking" in markup | |
| class TestLanguages: | |
| def test_english(self): | |
| assert LANGUAGES["English"] == "en" | |
| def test_chinese(self): | |
| assert LANGUAGES["Chinese"] == "zh" | |
| class TestMockMode: | |
| def test_can_be_forced(self, monkeypatch): | |
| monkeypatch.setenv("THIRD_EYE_MOCK", "true") | |
| assert mock_mode_enabled() is True | |
| def test_can_be_disabled(self, monkeypatch): | |
| monkeypatch.setenv("THIRD_EYE_MOCK", "false") | |
| monkeypatch.setenv("MODAL_TOKEN_ID", "fake") | |
| monkeypatch.setenv("MODAL_TOKEN_SECRET", "fake") | |
| assert mock_mode_enabled() is False | |
| def test_auto_enables_without_tokens(self, monkeypatch): | |
| monkeypatch.delenv("MODAL_TOKEN_ID", raising=False) | |
| monkeypatch.delenv("MODAL_TOKEN_SECRET", raising=False) | |
| monkeypatch.setenv("THIRD_EYE_MOCK", "auto") | |
| assert mock_mode_enabled() is True | |
| class TestResolveQuestion: | |
| def test_read_text_mode(self): | |
| result = resolve_question("Read Text", None, "", "en") | |
| assert "Read every word" in result | |
| assert "verbatim" in result | |
| def test_describe_mode(self): | |
| result = resolve_question("Describe", None, "", "en") | |
| assert "Describe" in result | |
| assert "blind" in result | |
| def test_ask_with_typed_question(self): | |
| result = resolve_question("Ask", None, "What color is this?", "en") | |
| assert result == "What color is this?" | |
| def test_ask_without_input(self): | |
| result = resolve_question("Ask", None, "", "en") | |
| assert "image" in result.lower() | |
| def test_typed_question_overrides_audio(self): | |
| result = resolve_question("Ask", "/some/audio.wav", "typed question", "en") | |
| assert result == "typed question" | |