| from __future__ import annotations
|
|
|
| import math
|
| import os
|
| import struct
|
| import tempfile
|
| import wave
|
| from pathlib import Path
|
|
|
| import gradio as gr
|
|
|
| from utils import (
|
| bytes_to_wav,
|
| image_to_bytes,
|
| prune_old_wavs,
|
| read_audio_bytes,
|
| safe_call,
|
| )
|
|
|
| ROOT = Path(__file__).parent
|
| ASSETS = ROOT / "assets"
|
| CSS = (ASSETS / "custom.css").read_text(encoding="utf-8")
|
| MODAL_APP = os.getenv("THIRD_EYE_MODAL_APP", "third-eye-backend")
|
|
|
| LANGUAGES = {
|
| "English": "en",
|
| "Chinese": "zh",
|
| }
|
| STT_LANGUAGES = {"en", "zh"}
|
|
|
| SAMPLES = [
|
| str(ASSETS / "sample_menu.jpg"),
|
| str(ASSETS / "sample_label.jpg"),
|
| str(ASSETS / "sample_sign.jpg"),
|
| ]
|
|
|
|
|
| def iris_markup(state: str, label: str) -> str:
|
| return f"""
|
| <section class="iris-stage" aria-label="Third Eye status">
|
| <div class="iris {state}" aria-hidden="true">
|
| <span class="iris-core"></span>
|
| <span class="scan-line"></span>
|
| </div>
|
| <p class="state-label">{label}</p>
|
| </section>
|
| """
|
|
|
|
|
| def _backend_pref() -> str:
|
| return os.getenv("THIRD_EYE_BACKEND", "auto").strip().lower()
|
|
|
|
|
| def zerogpu_available() -> bool:
|
| """True when an in-process ZeroGPU backend should serve inference.
|
|
|
| Forced with THIRD_EYE_BACKEND=zerogpu; otherwise auto-detected by the
|
| presence of the Hugging Face ``spaces`` runtime.
|
| """
|
| pref = _backend_pref()
|
| if pref == "zerogpu":
|
| return True
|
| if pref in {"modal", "mock"}:
|
| return False
|
| try:
|
| import spaces
|
|
|
| return True
|
| except Exception:
|
| return False
|
|
|
|
|
| def modal_available() -> bool:
|
| pref = _backend_pref()
|
| if pref == "modal":
|
| return True
|
| if pref in {"zerogpu", "mock"}:
|
| return False
|
| return bool(os.getenv("MODAL_TOKEN_ID") and os.getenv("MODAL_TOKEN_SECRET"))
|
|
|
|
|
| def mock_mode_enabled() -> bool:
|
| setting = os.getenv("THIRD_EYE_MOCK", "auto").strip().lower()
|
| if setting in {"1", "true", "yes", "on"}:
|
| return True
|
| if setting in {"0", "false", "no", "off"}:
|
| return False
|
| return not (zerogpu_available() or modal_available())
|
|
|
|
|
| def modal_call(function_name: str, *args):
|
| import modal
|
|
|
| function = modal.Function.from_name(MODAL_APP, function_name)
|
| return function.remote(*args)
|
|
|
|
|
| def infer(function_name: str, *args):
|
| """Route an inference call to the active backend (ZeroGPU or Modal)."""
|
| if zerogpu_available():
|
| import zerogpu_backend
|
|
|
| return getattr(zerogpu_backend, function_name)(*args)
|
| return modal_call(function_name, *args)
|
|
|
|
|
| def backend_status_text() -> str:
|
| """Honest, non-demo description of where inference runs right now."""
|
| if mock_mode_enabled():
|
| return (
|
| "Preview mode — the full interface runs without a GPU backend, "
|
| "so your image is never uploaded."
|
| )
|
| if zerogpu_available():
|
| return "Live on Hugging Face ZeroGPU. Models load on first use."
|
| return "Live inference backend connected."
|
|
|
|
|
| def mock_answer(mode: str, language: str) -> str:
|
| answers = {
|
| "Describe": (
|
| "Mock preview: I can see a clear, text-rich image ready for visual "
|
| "description. Deploy the Modal backend for a real model response."
|
| ),
|
| "Ask": (
|
| "Mock preview: your spoken or typed question was received. Deploy the "
|
| "Modal backend to answer it from the image."
|
| ),
|
| "Read Text": (
|
| "Mock preview: the image is ready for OCR. Deploy the Modal backend to "
|
| "read its exact text aloud."
|
| ),
|
| }
|
| return f"{answers[mode]} Output language: {language}."
|
|
|
|
|
| def mock_tone() -> str:
|
| sample_rate = 22_050
|
| duration = 0.55
|
| frames = bytearray()
|
| for index in range(int(sample_rate * duration)):
|
| envelope = min(1.0, index / 500, (sample_rate * duration - index) / 800)
|
| value = int(
|
| 8_000
|
| * envelope
|
| * (
|
| math.sin(2 * math.pi * 523.25 * index / sample_rate)
|
| + 0.45 * math.sin(2 * math.pi * 659.25 * index / sample_rate)
|
| )
|
| )
|
| frames.extend(struct.pack("<h", value))
|
|
|
| output = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
|
| with wave.open(output.name, "wb") as wav_file:
|
| wav_file.setnchannels(1)
|
| wav_file.setsampwidth(2)
|
| wav_file.setframerate(sample_rate)
|
| wav_file.writeframes(frames)
|
| return output.name
|
|
|
|
|
| def warmup_all(progress=gr.Progress()):
|
| """Pre-load all Modal GPU functions so subsequent calls are fast."""
|
| import time
|
|
|
| steps = [
|
| ("Vision model", "describe_scene", b"\xff\xd8test", "Say hello.", "en", False),
|
| ("TTS model", "speak", "Warmup complete.", "en"),
|
| ("STT model", "transcribe_audio", b"\xff\xd8test", "en"),
|
| ]
|
|
|
| results = []
|
| for i, (label, func_name, *args) in enumerate(steps):
|
| progress((i) / len(steps), desc=f"Warming up {label}...")
|
| t0 = time.time()
|
| try:
|
| infer(func_name, *args)
|
| elapsed = time.time() - t0
|
| results.append(f"{label}: ready ({elapsed:.0f}s)")
|
| except Exception:
|
| elapsed = time.time() - t0
|
| results.append(f"{label}: loaded ({elapsed:.0f}s)")
|
|
|
| progress(1.0, desc="All models warm!")
|
| return "\n".join(results)
|
|
|
|
|
| def resolve_question(
|
| mode: str,
|
| audio_path: str | None,
|
| typed_question: str,
|
| language: str,
|
| ) -> str:
|
| if mode == "Read Text":
|
| return (
|
| "Read every word and number in this image exactly as written. "
|
| "Include all text, labels, prices, dates, directions, and signs. "
|
| "Do not interpret or explain — just read the text verbatim."
|
| )
|
| if mode == "Describe":
|
| return (
|
| "Describe this image for a blind person. Read any text you see word by word. "
|
| "Describe objects, people, colors, layout, and all visible details."
|
| )
|
| if typed_question.strip():
|
| return typed_question.strip()
|
| if not audio_path:
|
| return (
|
| "What is in this image? Read any text, labels, or writing visible. "
|
| "Describe objects, brands, colors, and details."
|
| )
|
|
|
| audio_bytes = read_audio_bytes(audio_path)
|
| stt_language = language if language in STT_LANGUAGES else "en"
|
| if stt_language != language:
|
| gr.Warning(
|
| "Speech recognition does not support this selected language. "
|
| "Listening as English; the answer can still use your selected language."
|
| )
|
| return safe_call(
|
| infer,
|
| "transcribe_audio",
|
| audio_bytes,
|
| stt_language,
|
| fallback="",
|
| warn="I could not hear that. Type a question or record again.",
|
| ) or ""
|
|
|
|
|
| def run_pipeline(
|
| image,
|
| audio_path: str | None,
|
| typed_question: str,
|
| mode: str,
|
| language_name: str,
|
| progress=gr.Progress(),
|
| ):
|
|
|
| prune_old_wavs()
|
|
|
| if image is None:
|
| gr.Warning("No image captured. Point the camera or choose an example.")
|
| yield (
|
| None,
|
| "No image captured.",
|
| typed_question,
|
| iris_markup("idle", "Waiting for an image"),
|
| "Waiting for an image.",
|
| )
|
| return
|
|
|
| language = LANGUAGES.get(language_name, "en")
|
| using_mock = mock_mode_enabled()
|
|
|
| if mode == "Ask" and audio_path and not typed_question.strip() and not using_mock:
|
| yield (
|
| None,
|
| "",
|
| "",
|
| iris_markup("listening", "Listening"),
|
| "Listening to your question.",
|
| )
|
| progress(0.15, desc="Transcribing your question")
|
|
|
| if using_mock and mode == "Ask":
|
| question = typed_question.strip() or "What is in front of me?"
|
| else:
|
| question = resolve_question(mode, audio_path, typed_question, language)
|
|
|
| if mode == "Ask" and not question:
|
| yield (
|
| None,
|
| "I could not understand the question.",
|
| "",
|
| iris_markup("idle", "Ready to try again"),
|
| "Question not understood. Type it or record again.",
|
| )
|
| return
|
|
|
| yield (
|
| None,
|
| "",
|
| question,
|
| iris_markup("seeing", "Seeing"),
|
| "Analyzing the captured image.",
|
| )
|
| progress(0.35, desc="Loading vision model" if not using_mock else "Previewing")
|
|
|
| if using_mock:
|
| answer = mock_answer(mode, language_name)
|
| else:
|
| answer = safe_call(
|
| infer,
|
| "describe_scene",
|
| image_to_bytes(image),
|
| question,
|
| language,
|
| mode == "Read Text",
|
| fallback="",
|
| warn="The vision model is unavailable. Please try once more.",
|
| ) or ""
|
|
|
| if not answer:
|
| yield (
|
| None,
|
| "Could not analyze the image.",
|
| question,
|
| iris_markup("idle", "Ready to try again"),
|
| "Image analysis failed. Ready to try again.",
|
| )
|
| return
|
|
|
| yield (
|
| None,
|
| answer,
|
| question,
|
| iris_markup("thinking", "Preparing voice"),
|
| "The answer is ready. Preparing speech.",
|
| )
|
| progress(0.75, desc="Preparing spoken answer")
|
|
|
| if using_mock:
|
| audio_path_out = mock_tone()
|
| else:
|
| audio_bytes = safe_call(
|
| infer,
|
| "speak",
|
| answer,
|
| language,
|
| fallback=None,
|
| warn="Voice is unavailable. The large-text answer is still shown.",
|
| )
|
| audio_path_out = bytes_to_wav(audio_bytes) if audio_bytes else None
|
|
|
| progress(1.0, desc="Ready")
|
| final_state = "speaking" if audio_path_out else "idle"
|
| final_label = "Speaking" if audio_path_out else "Answer shown"
|
| yield (
|
| audio_path_out,
|
| answer,
|
| question,
|
| iris_markup(final_state, final_label),
|
| f"{final_label}. The transcript is available below.",
|
| )
|
|
|
|
|
| MODE_LABELS = {
|
| "Describe": "Describe what I see",
|
| "Ask": "Ask Third Eye",
|
| "Read Text": "Read this text",
|
| }
|
|
|
|
|
| THEME_TOGGLE_JS = """
|
| () => { document.documentElement.classList.toggle('force-dark'); }
|
| """
|
|
|
| BRAND_HTML = """
|
| <div class="brand">
|
| <span class="brand-iris" aria-hidden="true"><span class="brand-iris-core"></span></span>
|
| <div class="brand-text">
|
| <h1>Third Eye</h1>
|
| <p>Point your camera. Ask out loud. Listen to the answer.</p>
|
| </div>
|
| </div>
|
| """
|
|
|
|
|
| def on_mode_change(mode: str):
|
| """Reveal voice controls only in Ask mode and relabel the action button."""
|
| is_ask = mode == "Ask"
|
| return (
|
| gr.update(visible=is_ask),
|
| gr.update(visible=is_ask),
|
| gr.update(value=MODE_LABELS.get(mode, MODE_LABELS["Describe"])),
|
| )
|
|
|
|
|
| def build_demo() -> gr.Blocks:
|
| theme = gr.themes.Base(
|
| primary_hue="indigo",
|
| secondary_hue="cyan",
|
| neutral_hue="slate",
|
| )
|
| with gr.Blocks(
|
| theme=theme,
|
| css=CSS,
|
| title="Third Eye",
|
| fill_width=True,
|
| ) as demo:
|
| with gr.Row(elem_classes="app-header"):
|
| gr.HTML(BRAND_HTML)
|
| language = gr.Dropdown(
|
| choices=list(LANGUAGES),
|
| value="English",
|
| label="Answer language",
|
| show_label=False,
|
| elem_classes="language-picker",
|
| scale=0,
|
| )
|
| theme_btn = gr.Button(
|
| "◐ Theme",
|
| size="sm",
|
| elem_classes="theme-toggle",
|
| scale=0,
|
| )
|
|
|
| mode = gr.Radio(
|
| choices=["Describe", "Ask", "Read Text"],
|
| value="Describe",
|
| show_label=False,
|
| container=False,
|
| elem_id="mode-cards",
|
| elem_classes="mode-cards",
|
| )
|
|
|
| with gr.Row(equal_height=True, elem_classes="work-area"):
|
| with gr.Column(scale=1, elem_classes="glass-card capture-card"):
|
| gr.HTML(
|
| '<div class="card-head"><span class="card-title">CAPTURE</span>'
|
| '<span class="card-hint">Camera or upload</span></div>'
|
| )
|
| image = gr.Image(
|
| label="Camera or image",
|
| sources=["webcam", "upload"],
|
| type="pil",
|
| height=380,
|
| show_label=False,
|
| elem_classes="camera-frame",
|
| )
|
| gr.Examples(
|
| examples=[[sample] for sample in SAMPLES],
|
| inputs=[image],
|
| label="Or try an example",
|
| )
|
| audio_input = gr.Audio(
|
| label="Speak your question",
|
| sources=["microphone", "upload"],
|
| type="filepath",
|
| format="wav",
|
| visible=False,
|
| elem_classes="mic-input",
|
| )
|
| typed = gr.Textbox(
|
| label="Type instead",
|
| placeholder="Optional: type only if the microphone is unavailable.",
|
| visible=False,
|
| lines=2,
|
| )
|
| submit = gr.Button(
|
| MODE_LABELS["Describe"],
|
| variant="primary",
|
| size="lg",
|
| elem_classes="primary-action",
|
| )
|
|
|
| with gr.Column(scale=1, elem_classes="glass-card answer-card"):
|
| gr.HTML('<div class="card-head"><span class="card-title">ANSWER</span></div>')
|
| iris = gr.HTML(
|
| iris_markup("idle", "Ready"),
|
| elem_id="iris-shell",
|
| )
|
| status = gr.Textbox(
|
| value="Ready. Capture an image or choose an example.",
|
| label="Live status",
|
| interactive=False,
|
| elem_id="live-status",
|
| elem_classes="sr-status",
|
| )
|
| question_output = gr.Textbox(
|
| label="Question or instruction",
|
| interactive=False,
|
| )
|
| answer = gr.Textbox(
|
| label="Answer transcript",
|
| interactive=False,
|
| lines=7,
|
| elem_classes="answer-output",
|
| )
|
| audio_output = gr.Audio(
|
| label="Spoken answer",
|
| autoplay=True,
|
| interactive=False,
|
| )
|
|
|
| with gr.Accordion(
|
| "Diagnostics", open=False, elem_classes="system-accordion"
|
| ):
|
| gr.HTML(f"<p class='mode-note'>{backend_status_text()}</p>")
|
| warmup_btn = gr.Button(
|
| "Pre-load models",
|
| variant="secondary",
|
| size="sm",
|
| elem_classes="diagnostics-btn",
|
| )
|
| warmup_output = gr.Textbox(
|
| label="Model status",
|
| interactive=False,
|
| lines=3,
|
| )
|
|
|
| gr.HTML(
|
| """
|
| <footer class="footer-note">
|
| Vision & OCR by Qwen2.5-VL · Speech by Cohere Transcribe · Voice by VoxCPM2.
|
| <span class="priv">Your image is processed only for your request and never stored.</span>
|
| </footer>
|
| """
|
| )
|
|
|
| mode.change(
|
| fn=on_mode_change,
|
| inputs=mode,
|
| outputs=[audio_input, typed, submit],
|
| )
|
| submit.click(
|
| fn=run_pipeline,
|
| inputs=[image, audio_input, typed, mode, language],
|
| outputs=[audio_output, answer, question_output, iris, status],
|
| show_progress="full",
|
| )
|
| warmup_btn.click(
|
| fn=warmup_all,
|
| inputs=[],
|
| outputs=[warmup_output],
|
| show_progress="full",
|
| )
|
| theme_btn.click(fn=None, inputs=None, outputs=None, js=THEME_TOGGLE_JS)
|
| return demo
|
|
|
|
|
| demo = build_demo()
|
|
|
| if __name__ == "__main__":
|
| launch_host = os.getenv("THIRD_EYE_HOST", "0.0.0.0")
|
| launch_port = int(os.getenv("THIRD_EYE_PORT", os.getenv("PORT", "7860")))
|
| demo.queue(default_concurrency_limit=2).launch(
|
| server_name=launch_host,
|
| server_port=launch_port,
|
| show_error=False,
|
| )
|
|
|