File size: 7,765 Bytes
df6b3ac
26dae50
df6b3ac
 
26dae50
df6b3ac
26dae50
df6b3ac
26dae50
f7a11cd
df6b3ac
26dae50
 
 
 
 
 
 
 
 
f7a11cd
26dae50
 
 
 
 
 
df6b3ac
26dae50
 
 
 
 
5d19f12
df6b3ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f7a11cd
 
 
 
 
 
 
 
 
 
26dae50
df6b3ac
 
 
 
f7a11cd
df6b3ac
 
 
 
 
 
 
 
 
 
26dae50
df6b3ac
 
26dae50
f7a11cd
26dae50
 
 
 
 
 
 
df6b3ac
 
19657cc
 
 
 
 
df6b3ac
 
19657cc
 
 
 
 
df6b3ac
 
 
 
f7a11cd
 
 
 
 
 
19657cc
 
 
 
 
 
 
 
df6b3ac
19657cc
 
 
 
f7a11cd
df6b3ac
19657cc
df6b3ac
 
 
19657cc
 
 
 
df6b3ac
 
 
 
 
f7a11cd
df6b3ac
 
 
 
f7a11cd
df6b3ac
 
 
 
 
 
 
f7a11cd
df6b3ac
 
 
 
26dae50
 
 
 
 
 
 
 
 
 
 
 
137241f
df6b3ac
26dae50
 
df6b3ac
26dae50
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""Iris — your father's eyes, by voice (HF Space, ZeroGPU).

gr.Server: serves a custom voice-first frontend (frontend/index.html) and exposes
the API endpoints. Pipeline: Whisper (STT) -> Qwen3-VL (description / VQA) -> Piper (TTS).
"""
import difflib
import os
import re
import tempfile
import time
import unicodedata
from pathlib import Path

os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:True")

from fastapi.responses import HTMLResponse  # noqa: E402
from fastapi.staticfiles import StaticFiles  # noqa: E402
from gradio import Server  # noqa: E402
from gradio.data_classes import FileData  # noqa: E402

from core import stt, trace, tts, vlm  # noqa: E402

HERE = Path(__file__).parent
app = Server(title="Iris")


def _path(f):
    """gr.Server delivers FileData as a dict; accept dict or object."""
    if f is None:
        return None
    return f["path"] if isinstance(f, dict) else f.path


# voice commands (live mode on/off), tolerant of transcription errors (e.g. "modo ao fio")
_OFF_RE = re.compile(r"\b(pare|parar|para de|deslig\w*|cancela\w*|stop|turn off|"
                     r"silenci\w*|quieto|chega|modo manual|manual)\b")


def _norm(s: str) -> str:
    s = unicodedata.normalize("NFKD", s.lower())
    s = "".join(c for c in s if not unicodedata.combining(c))
    return s.strip().strip(".!?,").strip()


def detect_command(text: str):
    """Map a (possibly mis-transcribed) utterance to a live-mode command."""
    if not text:
        return None
    t = _norm(text)
    if _OFF_RE.search(t):
        return "live_off"
    on = ("ao vivo" in t or "ao fio" in t or "modo ao" in t or "descreva sempre" in t
          or "descreva tudo" in t or "modo continuo" in t or "tempo real" in t
          or "live" in t or "automatico" in t
          or difflib.SequenceMatcher(None, t, "modo ao vivo").ratio() >= 0.7)
    return "live_on" if on else None


def choose_lang(text: str, detected: str) -> str:
    """Pick 'pt'/'en' from what the person said (keywords) or Whisper's guess."""
    low = (text or "").lower()
    if "ingl" in low or "english" in low:
        return "en"
    if "portug" in low or "brasil" in low:
        return "pt"
    return "pt" if detected == "pt" else "en"


@app.api(name="describe")
def describe(image: FileData, audio: FileData | None = None, lang: str = "pt",
            qtext: str = "") -> dict:
    """Camera frame + a question (as recorded audio OR as text from browser speech
    recognition) -> a command OR a PT/EN description + audio."""
    t0 = time.time()
    if qtext and qtext.strip():
        question = qtext.strip()
    else:
        apath = _path(audio)
        question = stt.transcribe(apath, language=lang) if apath else ""
    cmd = detect_command(question)
    if cmd:
        print(f"[describe] command {cmd} <- {question!r}", flush=True)
        return {"command": cmd, "question": question, "answer": "", "audio": None}
    answer = vlm.describe(_path(image), question, lang=lang)
    if not answer.strip():
        answer = "I couldn't describe that." if lang == "en" else "Não consegui descrever isso."
    wav = tts.synthesize(answer, lang=lang)
    print(f"[describe] q={question!r} a={answer!r}", flush=True)
    trace.log("describe", lang, question, answer, time.time() - t0, vlm.MODEL_ID)
    return {
        "question": question,
        "answer": answer,
        "audio": FileData(path=wav) if wav else None,
    }


# Live mode: describe ONLY new/relevant things, sparingly (avoid verbosity).
WATCH_PT = (
    "Você observa o ambiente para uma pessoa cega. "
    "Coisas já ditas (não repita): \"{prev}\". "
    "Olhe a cena AGORA. Se apareceu QUALQUER objeto ou pessoa novo — mesmo pequeno "
    "(óculos, celular, copo, papel, comida) — diga em UMA frase curta o que é. "
    "Só responda NADA se a cena estiver basicamente igual ao que já foi dito."
)
WATCH_EN = (
    "You watch the environment for a blind person. "
    "Already said (do not repeat): \"{prev}\". "
    "Look at the scene NOW. If ANY new object or person appeared — even small "
    "(glasses, a phone, a cup, paper, food) — say what it is in ONE short sentence. "
    "Only reply NONE if the scene is basically the same as what was already said."
)
_QUIET = {"nada", "none", "nenhum", "nothing", "sem novidade", "no change"}


def is_quiet(answer: str) -> bool:
    """True when the live-mode answer means 'nothing new to report'."""
    low = (answer or "").strip().lower().strip(".!").strip()
    return (not low) or low in _QUIET


HINT_PT = ("Uma pessoa entrou no campo de visão. Em UMA frase de no máximo 12 palavras, "
           "avise de forma útil para um cego (ex.: 'Há alguém à sua frente.'). Descreva só o "
           "que você vê com CERTEZA; não invente; não repita o que já foi dito: \"{prev}\".")
HINT_EN = ("A person entered the field of view. In ONE sentence of at most 12 words, alert a "
           "blind person usefully (e.g. 'Someone is in front of you.'). Describe only what you "
           "see for CERTAIN; do not invent; do not repeat what was already said: \"{prev}\".")


@app.api(name="watch")
def watch(image: FileData, prev: str = "", lang: str = "pt", hint: str = "") -> dict:
    """Live mode. First call (no history) describes the scene as a baseline. When the
    in-browser detector flags a new object (`hint`), describe it (trust the gate).
    Otherwise (pixel-diff fallback) speak ONLY if something new entered, else stay quiet."""
    t0 = time.time()
    prev = (prev or "").strip()
    hint = (hint or "").strip()
    if not prev:
        # baseline: describe the current scene so the user hears the surroundings
        answer = vlm.describe(_path(image), lang=lang)
    elif hint:
        # the detector already decided something new is here -> just describe it
        tmpl = HINT_EN if lang == "en" else HINT_PT
        answer = vlm.describe(_path(image), lang=lang, system=tmpl.format(hint=hint, prev=prev))
    else:
        tmpl = WATCH_EN if lang == "en" else WATCH_PT
        sys = tmpl.format(prev=prev)
        q = "What is new?" if lang == "en" else "O que há de novo?"
        answer = vlm.describe(_path(image), question=q, lang=lang, system=sys)
    if is_quiet(answer):
        print(f"[watch] quiet ({answer!r})", flush=True)
        return {"speak": False, "answer": ""}
    wav = tts.synthesize(answer, lang=lang)
    print(f"[watch] SPEAK: {answer!r}", flush=True)
    trace.log("watch", lang, "", answer, time.time() - t0, vlm.MODEL_ID)
    return {"speak": True, "answer": answer, "audio": FileData(path=wav) if wav else None}


@app.api(name="detect_lang")
def detect_lang(audio: FileData) -> dict:
    """Choose the language by voice: the person says 'português' or 'english'."""
    text, detected = stt.transcribe_auto(_path(audio))
    lang = choose_lang(text, detected)
    print(f"[detect_lang] {text!r} (det={detected}) -> {lang}", flush=True)
    return {"lang": lang, "text": text}


app.mount("/static", StaticFiles(directory=str(HERE / "frontend")), name="static")


@app.get("/")
def index():
    return HTMLResponse((HERE / "frontend" / "index.html").read_text(encoding="utf-8"))


if __name__ == "__main__":
    if os.environ.get("IRIS_WARMUP") == "1":
        print("Warmup...", flush=True)
        try:
            vlm.describe(str(HERE / "assets" / "warmup.jpg"), "test")
            stt.transcribe(tts.synthesize("test"))
            print("Warmup OK", flush=True)
        except Exception as e:
            print("Warmup failed:", e, flush=True)
    port = int(os.environ.get("GRADIO_SERVER_PORT", os.environ.get("PORT", 7860)))
    app.launch(server_name="0.0.0.0", server_port=port, show_error=True,
               allowed_paths=[tempfile.gettempdir()])