lisper-zerogpu / app.py
thomasjvu's picture
Deploy Lisper ZeroGPU Space
400c600 verified
from __future__ import annotations
import base64
import io
import json
import math
import os
import re
import tempfile
import traceback
from typing import Any
from pathlib import Path
# Unsloth's compiled Gemma 4 audio path can trip TorchDynamo on ZeroGPU's
# runtime torch build. Keep inference eager for reliability.
os.environ.setdefault("TORCHDYNAMO_DISABLE", "1")
os.environ.setdefault("UNSLOTH_COMPILE_DISABLE", "1")
import gradio as gr
import librosa
import numpy as np
import soundfile as sf
import spaces
import torch
from transformers import AutoProcessor, Gemma4ForConditionalGeneration
from live_audio_policy import (
LiveAudioPolicy,
compute_live_audio_diagnostics,
decide_live_analysis,
validate_live_audio_diagnostics,
)
ALLOWED_CLASSES = {"clear", "frontal", "lateral", "dental", "palatal"}
DEFAULT_MODEL_ID = "thomasjvu/lisper-gemma4-e2b-audio-full"
DEFAULT_ADAPTER_ID = ""
SPACE_ROOT = Path(__file__).resolve().parent
ACOUSTIC_MODEL_PATH = SPACE_ROOT / "acoustic_model.json"
ACOUSTIC_EXTRATREES_MODEL_PATH = SPACE_ROOT / "acoustic_extratrees_v18.joblib"
ACOUSTIC_K = 5
ACOUSTIC_MIN_CONFIDENCE = 0.42
KNN_OVERRIDE_MAX_DISTANCE = 0.25
KNN_OVERRIDE_MIN_CONFIDENCE = 0.90
LIVE_CLEAR_MIN_CONFIDENCE = 0.85
LIVE_CLEAR_MIN_MARGIN = 0.25
LIVE_NONCLEAR_MIN_CONFIDENCE = 0.55
LIVE_NONCLEAR_MIN_MARGIN = 0.12
MIN_AUDIO_SECONDS = 0.45
MIN_AUDIO_RMS = 0.0015
MIN_AUDIO_PEAK = 0.012
MIN_VOICED_RATIO = 0.002
MIN_SPEECH_FRAME_RATIO = 0.04
MIN_TONAL_FRAME_RATIO = 0.04
MIN_SIBILANT_FRAME_RATIO = 0.015
MAX_NOISE_FLATNESS = 0.40
MAX_CLIPPING_RATIO = 0.08
DEFAULT_PROMPT = """Analyze this pronunciation attempt for lisp type and give concise corrective coaching.
Return exactly four labeled lines in this order:
Detected class: clear|frontal|lateral|dental|palatal
Reason: one brief reason tied to tongue placement or airflow
Corrective cue: one concrete next-step cue
Encouragement: one brief supportive line"""
CLASS_TEMPLATES = {
"clear": {
"reason": "The acoustic pattern did not strongly match the trained lisp-pattern examples, so this is treated as a tentative clear result.",
"cue": "Repeat once at a relaxed pace and keep the airflow centered through the front of the mouth.",
},
"dental": {
"reason": "The acoustic pattern is closest to the dental examples, where tongue contact near the teeth can narrow the /s/ groove.",
"cue": "Relax the tongue slightly off the teeth and keep a narrow stream of air moving forward.",
},
"frontal": {
"reason": "The acoustic pattern is closest to the frontal examples, which often sound like the airflow is too far forward.",
"cue": "Keep the tongue tip just behind the upper teeth and send the air straight forward through a small groove.",
},
"lateral": {
"reason": "The acoustic pattern is closest to the lateral examples, where air may be escaping around the sides of the tongue.",
"cue": "Start from a light /t/ position, seal the tongue sides, and let the air move forward through the center.",
},
"palatal": {
"reason": "The acoustic pattern is closest to the palatal examples, where the tongue can sit too far back and muffle the sound.",
"cue": "Bring the tongue tip slightly forward behind the upper teeth and brighten the airflow.",
},
}
GUARDED_CLASS_TEMPLATES = {
"dental": {
"reason": "The acoustic model was not confident enough to call this clear; the nearest non-clear pattern is dental, which can happen when the tongue presses too close to the teeth.",
"cue": "Try one slower repetition with the tongue relaxed just behind the teeth and the air moving forward through a narrow center groove.",
},
"frontal": {
"reason": "The acoustic model was not confident enough to call this clear; the nearest non-clear pattern is frontal, which can happen when the tongue or airflow moves too far forward.",
"cue": "Keep the tongue tip behind the upper teeth and avoid letting it push between the teeth during /s/ sounds.",
},
"lateral": {
"reason": "The acoustic model was not confident enough to call this clear; the nearest non-clear pattern is lateral, where air may be leaking around the tongue sides.",
"cue": "Seal the tongue sides lightly against the upper molars and aim the air straight down the middle.",
},
"palatal": {
"reason": "The acoustic model was not confident enough to call this clear; the nearest non-clear pattern is palatal, where the tongue may be sitting too far back.",
"cue": "Bring the tongue tip forward just behind the upper teeth and brighten the /s/ airflow.",
},
}
BROWSER_RECORDER_START_JS = r"""
async (payload) => {
const state = window.__lisperRecorder || {};
if (state.recording) {
return [payload || "", "Recording is already active.", ""];
}
if (!navigator.mediaDevices || !navigator.mediaDevices.getUserMedia) {
return [payload || "", "This browser cannot access microphone recording.", ""];
}
const stream = await navigator.mediaDevices.getUserMedia({
audio: {
channelCount: 1,
echoCancellation: false,
noiseSuppression: false,
autoGainControl: true,
},
video: false,
});
const AudioContextCtor = window.AudioContext || window.webkitAudioContext;
const audioContext = new AudioContextCtor({ sampleRate: 16000 });
await audioContext.resume();
const source = audioContext.createMediaStreamSource(stream);
const processor = audioContext.createScriptProcessor(4096, 1, 1);
const silentGain = audioContext.createGain();
silentGain.gain.value = 0;
const chunks = [];
let peak = 0;
let sumSquares = 0;
let sampleCount = 0;
processor.onaudioprocess = (event) => {
if (!window.__lisperRecorder?.recording) {
return;
}
const input = event.inputBuffer.getChannelData(0);
const copy = new Float32Array(input.length);
copy.set(input);
chunks.push(copy);
for (let i = 0; i < copy.length; i += 1) {
const value = copy[i];
const absValue = Math.abs(value);
if (absValue > peak) peak = absValue;
sumSquares += value * value;
}
sampleCount += copy.length;
};
source.connect(processor);
processor.connect(silentGain);
silentGain.connect(audioContext.destination);
window.__lisperRecorder = {
recording: true,
stream,
audioContext,
source,
processor,
silentGain,
chunks,
startedAt: Date.now(),
getStats: () => ({
peak,
rms: sampleCount ? Math.sqrt(sumSquares / sampleCount) : 0,
sampleCount,
sampleRate: audioContext.sampleRate,
}),
};
return ["", "Recording through Web Audio... press Stop when finished.", ""];
}
"""
BROWSER_RECORDER_STOP_JS = r"""
async () => {
const state = window.__lisperRecorder;
if (!state || !state.recording) {
return ["", "No active browser recording. Press Record first.", ""];
}
state.recording = false;
try { state.processor.disconnect(); } catch (_) {}
try { state.source.disconnect(); } catch (_) {}
try { state.silentGain.disconnect(); } catch (_) {}
for (const track of state.stream.getTracks()) {
track.stop();
}
const stats = state.getStats();
const sampleRate = stats.sampleRate || 16000;
const totalLength = state.chunks.reduce((sum, chunk) => sum + chunk.length, 0);
const samples = new Float32Array(totalLength);
let offset = 0;
for (const chunk of state.chunks) {
samples.set(chunk, offset);
offset += chunk.length;
}
await state.audioContext.close().catch(() => undefined);
window.__lisperRecorder = null;
function writeString(view, byteOffset, string) {
for (let i = 0; i < string.length; i += 1) {
view.setUint8(byteOffset + i, string.charCodeAt(i));
}
}
function encodeWav(floatSamples, wavSampleRate) {
const bytesPerSample = 2;
const blockAlign = bytesPerSample;
const buffer = new ArrayBuffer(44 + floatSamples.length * bytesPerSample);
const view = new DataView(buffer);
writeString(view, 0, "RIFF");
view.setUint32(4, 36 + floatSamples.length * bytesPerSample, true);
writeString(view, 8, "WAVE");
writeString(view, 12, "fmt ");
view.setUint32(16, 16, true);
view.setUint16(20, 1, true);
view.setUint16(22, 1, true);
view.setUint32(24, wavSampleRate, true);
view.setUint32(28, wavSampleRate * blockAlign, true);
view.setUint16(32, blockAlign, true);
view.setUint16(34, 16, true);
writeString(view, 36, "data");
view.setUint32(40, floatSamples.length * bytesPerSample, true);
let byteOffset = 44;
for (let i = 0; i < floatSamples.length; i += 1, byteOffset += 2) {
const clamped = Math.max(-1, Math.min(1, floatSamples[i]));
view.setInt16(byteOffset, clamped < 0 ? clamped * 0x8000 : clamped * 0x7fff, true);
}
return new Blob([view], { type: "audio/wav" });
}
const blob = encodeWav(samples, sampleRate);
const dataUrl = await new Promise((resolve, reject) => {
const reader = new FileReader();
reader.onload = () => resolve(reader.result);
reader.onerror = () => reject(reader.error);
reader.readAsDataURL(blob);
});
const durationSeconds = sampleRate ? samples.length / sampleRate : 0;
const payload = JSON.stringify({
source: "browser-web-audio-wav",
data_url: dataUrl,
mime_type: "audio/wav",
sample_rate: sampleRate,
sample_count: samples.length,
duration_seconds: Number(durationSeconds.toFixed(3)),
peak: Number(stats.peak.toFixed(6)),
rms: Number(stats.rms.toFixed(6)),
created_at: new Date().toISOString(),
});
const status = stats.peak < 0.003
? `Clip captured but appears very quiet. peak=${stats.peak.toFixed(6)} rms=${stats.rms.toFixed(6)}. Check browser microphone permission/input.`
: `Clip ready: ${durationSeconds.toFixed(1)}s, peak=${stats.peak.toFixed(3)}. Playback should contain your voice.`;
const playback = `<audio controls src="${dataUrl}" style="width:100%;"></audio>`;
return [payload, status, playback];
}
"""
BROWSER_RECORDER_CLEAR_JS = r"""
async () => {
const state = window.__lisperRecorder;
if (state?.recording) {
state.recording = false;
try { state.processor.disconnect(); } catch (_) {}
try { state.source.disconnect(); } catch (_) {}
try { state.silentGain.disconnect(); } catch (_) {}
for (const track of state.stream.getTracks()) {
track.stop();
}
await state.audioContext.close().catch(() => undefined);
}
window.__lisperRecorder = null;
return ["", "No browser recording ready.", ""];
}
"""
class InvalidAudioError(ValueError):
"""Raised when a clip is too short or too quiet to analyze honestly."""
def __init__(self, message: str, diagnostics: dict[str, Any]):
super().__init__(message)
self.diagnostics = diagnostics
def env_int(name: str, default: int) -> int:
try:
return int(os.environ.get(name, str(default)))
except ValueError:
return default
def env_float(name: str, default: float) -> float:
try:
return float(os.environ.get(name, str(default)))
except ValueError:
return default
def model_id() -> str:
return os.environ.get("LISPER_ZERO_GPU_MODEL_ID", DEFAULT_MODEL_ID).strip() or DEFAULT_MODEL_ID
def adapter_id() -> str:
return os.environ.get("LISPER_ZERO_GPU_ADAPTER_ID", DEFAULT_ADAPTER_ID).strip()
def max_new_tokens() -> int:
return env_int("LISPER_ZERO_GPU_MAX_NEW_TOKENS", 96)
def max_seq_length() -> int:
return env_int("LISPER_ZERO_GPU_MAX_SEQ_LENGTH", 2048)
def zero_gpu_size() -> str:
requested = os.environ.get("LISPER_ZERO_GPU_SIZE", "large").strip().lower()
return "xlarge" if requested == "xlarge" else "large"
def eager_load_enabled() -> bool:
return os.environ.get("LISPER_ZERO_GPU_EAGER_LOAD", "0").strip() != "0"
def load_in_4bit_enabled() -> bool:
default = "1" if adapter_id() else "0"
return os.environ.get("LISPER_ZERO_GPU_LOAD_IN_4BIT", default).strip() != "0"
def acoustic_hint_enabled() -> bool:
return os.environ.get("LISPER_ZERO_GPU_ACOUSTIC_HINT", "1").strip() != "0"
def acoustic_model_preference() -> str:
requested = os.environ.get("LISPER_ZERO_GPU_ACOUSTIC_MODEL", "auto").strip().lower()
if requested in {"extratrees", "knn"}:
return requested
return "auto"
def live_clear_min_confidence() -> float:
return env_float("LISPER_ZERO_GPU_LIVE_CLEAR_MIN_CONFIDENCE", LIVE_CLEAR_MIN_CONFIDENCE)
def live_clear_min_margin() -> float:
return env_float("LISPER_ZERO_GPU_LIVE_CLEAR_MIN_MARGIN", LIVE_CLEAR_MIN_MARGIN)
def live_nonclear_min_confidence() -> float:
return env_float(
"LISPER_ZERO_GPU_LIVE_NONCLEAR_MIN_CONFIDENCE",
env_float("LISPER_ZERO_GPU_LIVE_NONCLEAR_MIN_SCORE", LIVE_NONCLEAR_MIN_CONFIDENCE),
)
def live_nonclear_min_margin() -> float:
return env_float("LISPER_ZERO_GPU_LIVE_NONCLEAR_MIN_MARGIN", LIVE_NONCLEAR_MIN_MARGIN)
def knn_override_max_distance() -> float:
return env_float("LISPER_ZERO_GPU_KNN_OVERRIDE_MAX_DISTANCE", KNN_OVERRIDE_MAX_DISTANCE)
def knn_override_min_confidence() -> float:
return env_float("LISPER_ZERO_GPU_KNN_OVERRIDE_MIN_CONFIDENCE", KNN_OVERRIDE_MIN_CONFIDENCE)
def live_audio_policy() -> LiveAudioPolicy:
return LiveAudioPolicy(
min_audio_seconds=env_float("LISPER_ZERO_GPU_MIN_AUDIO_SECONDS", MIN_AUDIO_SECONDS),
min_peak=env_float("LISPER_ZERO_GPU_MIN_AUDIO_PEAK", MIN_AUDIO_PEAK),
min_rms=env_float("LISPER_ZERO_GPU_MIN_AUDIO_RMS", MIN_AUDIO_RMS),
min_voiced_ratio=env_float("LISPER_ZERO_GPU_MIN_VOICED_RATIO", MIN_VOICED_RATIO),
min_speech_frame_ratio=env_float("LISPER_ZERO_GPU_MIN_SPEECH_FRAME_RATIO", MIN_SPEECH_FRAME_RATIO),
min_tonal_frame_ratio=env_float("LISPER_ZERO_GPU_MIN_TONAL_FRAME_RATIO", MIN_TONAL_FRAME_RATIO),
min_sibilant_frame_ratio=env_float("LISPER_ZERO_GPU_MIN_SIBILANT_FRAME_RATIO", MIN_SIBILANT_FRAME_RATIO),
max_noise_flatness=env_float("LISPER_ZERO_GPU_MAX_NOISE_FLATNESS", MAX_NOISE_FLATNESS),
max_clipping_ratio=env_float("LISPER_ZERO_GPU_MAX_CLIPPING_RATIO", MAX_CLIPPING_RATIO),
clear_min_confidence=live_clear_min_confidence(),
clear_min_margin=live_clear_min_margin(),
nonclear_min_confidence=live_nonclear_min_confidence(),
nonclear_min_margin=live_nonclear_min_margin(),
)
def audio_alignment_enabled() -> bool:
default = "0" if adapter_id() else "1"
return os.environ.get("LISPER_ZERO_GPU_ALIGN_AUDIO_TOKENS", default).strip() != "0"
def gemma_generation_enabled() -> bool:
return os.environ.get("LISPER_ZERO_GPU_USE_GEMMA_GENERATION", "0").strip() != "0"
def torch_dtype() -> torch.dtype:
requested = os.environ.get("LISPER_ZERO_GPU_DTYPE", "float16").strip().lower()
if requested == "bfloat16":
return torch.bfloat16
if requested == "float32":
return torch.float32
return torch.float16
def auth_token() -> str | None:
token = os.environ.get("HF_TOKEN", "").strip()
return token or None
def decode_browser_audio_payload(payload: str) -> np.ndarray:
try:
parsed = json.loads(payload)
data_url = str(parsed.get("data_url") or "")
if "," not in data_url:
raise ValueError("Browser recorder payload is missing audio data.")
_, encoded = data_url.split(",", 1)
audio_bytes = base64.b64decode(encoded)
waveform, sample_rate = sf.read(io.BytesIO(audio_bytes), dtype="float32", always_2d=False)
except Exception as exc:
raise InvalidAudioError(
f"Browser recording could not be decoded: {type(exc).__name__}: {exc}",
{"status": "invalid_browser_audio_payload"},
) from exc
if waveform.ndim > 1:
waveform = waveform.mean(axis=1)
if sample_rate != 16000:
waveform = librosa.resample(waveform, orig_sr=sample_rate, target_sr=16000)
return waveform.astype(np.float32)
def normalize_audio(audio_value: str | tuple[int, np.ndarray] | None) -> np.ndarray:
if audio_value is None:
raise gr.Error("Record or upload a short audio clip first.")
if isinstance(audio_value, str):
if audio_value.strip().startswith("{"):
waveform = decode_browser_audio_payload(audio_value)
sample_rate = 16000
else:
waveform, sample_rate = sf.read(audio_value, dtype="float32", always_2d=False)
else:
sample_rate, waveform = audio_value
waveform = np.asarray(waveform, dtype=np.float32)
if waveform.ndim > 1:
waveform = waveform.mean(axis=1)
if sample_rate != 16000:
waveform = librosa.resample(waveform, orig_sr=sample_rate, target_sr=16000)
# Keep ZeroGPU requests bounded.
max_samples = 12 * 16000
if waveform.shape[0] > max_samples:
waveform = waveform[:max_samples]
peak = float(np.max(np.abs(waveform))) if waveform.size else 0.0
if peak > 1.0:
waveform = waveform / peak
return waveform.astype(np.float32)
def audio_diagnostics(waveform: np.ndarray, sample_rate: int = 16000) -> dict[str, Any]:
return compute_live_audio_diagnostics(waveform, sample_rate=sample_rate, policy=live_audio_policy())
def validate_audio_for_analysis(waveform: np.ndarray) -> dict[str, Any]:
diagnostics = audio_diagnostics(waveform)
decision = validate_live_audio_diagnostics(diagnostics, live_audio_policy())
if decision["status"] != "accepted":
diagnostics["live_audio_gate"] = decision
raise InvalidAudioError(str(decision["reason"]), diagnostics)
return diagnostics
def write_temp_audio(waveform: np.ndarray) -> str:
fd, path = tempfile.mkstemp(prefix="lisper-zero-gpu-", suffix=".wav")
os.close(fd)
sf.write(path, waveform, 16000)
return path
def build_messages(target_text: str, audio_url: str, acoustic_result: dict[str, Any] | None = None) -> list[dict[str, Any]]:
instruction = DEFAULT_PROMPT
if target_text.strip():
instruction += f'\n\nTarget text: "{target_text.strip()}"'
if acoustic_result:
instruction += (
"\n\nAcoustic pre-analysis from the waveform: "
f"class={acoustic_result['detected_class']}, "
f"confidence={acoustic_result['confidence']:.3f}. "
"Use this exact class for the Detected class line. Do not override it."
)
return [
{
"role": "system",
"content": [
{
"type": "text",
"text": "You are Lisper, a supportive speech-therapy assistant focused on concise lisp coaching.",
}
],
},
{
"role": "user",
"content": [
{"type": "audio", "url": audio_url},
{"type": "text", "text": instruction},
],
},
]
def build_runtime() -> tuple[Any, Any]:
repo_id = model_id()
adapter_repo_id = adapter_id()
token = auth_token()
if adapter_repo_id:
import unsloth # noqa: F401
from unsloth import FastVisionModel
kwargs = {
"model_name": adapter_repo_id,
"max_seq_length": max_seq_length(),
"load_in_4bit": load_in_4bit_enabled(),
"full_finetuning": False,
}
if token:
kwargs["token"] = token
model, processor = FastVisionModel.from_pretrained(**kwargs)
FastVisionModel.for_inference(model)
model.eval()
return processor, model
processor_source = adapter_repo_id or repo_id
processor = AutoProcessor.from_pretrained(processor_source, token=token, trust_remote_code=True)
model = Gemma4ForConditionalGeneration.from_pretrained(
repo_id,
token=token,
torch_dtype=torch_dtype(),
device_map={"": "cuda"},
trust_remote_code=True,
)
model.eval()
return processor, model
RUNTIME: tuple[Any, Any] | None = build_runtime() if eager_load_enabled() else None
LAST_INPUT_SUMMARY: dict[str, Any] = {}
def load_runtime() -> tuple[Any, Any]:
global RUNTIME
if RUNTIME is None:
RUNTIME = build_runtime()
return RUNTIME
def strip_generation_artifacts(text: str) -> str:
return text.replace("```", "").replace("<bos>", "").strip()
def extract_line(label: str, text: str) -> str:
match = re.search(rf"^{label}:\s*(.+)$", text, flags=re.IGNORECASE | re.MULTILINE)
return match.group(1).strip() if match else ""
def normalize_label(text: str) -> str:
value = text.strip().lower()
if "inconclusive" in value or "unclear" in value:
return "inconclusive"
for candidate in ALLOWED_CLASSES:
if candidate in value:
return candidate
return "inconclusive"
def parse_response(response: str) -> dict[str, Any]:
detected = normalize_label(extract_line("Detected class", response))
return {
"detected_class": detected,
"reason": extract_line("Reason", response),
"corrective_cue": extract_line("Corrective cue", response),
"encouragement": extract_line("Encouragement", response),
"raw_response": response,
"model_id": model_id(),
"adapter_id": adapter_id() or None,
}
def acoustic_normalize_audio(audio: np.ndarray) -> np.ndarray:
audio = np.asarray(audio, dtype=np.float32).reshape(-1)
if audio.size == 0:
return audio
audio = audio - float(np.mean(audio))
peak = float(np.max(np.abs(audio)))
if peak > 0:
audio = audio * (0.98 / peak)
return audio.astype(np.float32)
def frame_audio(audio: np.ndarray, sr: int, frame_ms: float = 25.0, hop_ms: float = 10.0) -> np.ndarray:
frame = max(1, int(sr * frame_ms / 1000))
hop = max(1, int(sr * hop_ms / 1000))
if len(audio) < frame:
audio = np.pad(audio, (0, frame - len(audio)))
count = 1 + (len(audio) - frame) // hop
shape = (count, frame)
strides = (audio.strides[0] * hop, audio.strides[0])
return np.lib.stride_tricks.as_strided(audio, shape=shape, strides=strides).copy()
def summarize_feature_values(values: np.ndarray) -> list[float]:
values = np.asarray(values, dtype=np.float64)
values = values[np.isfinite(values)]
if values.size == 0:
return [0.0] * 6
return [
float(np.mean(values)),
float(np.std(values)),
float(np.min(values)),
float(np.max(values)),
float(np.percentile(values, 10)),
float(np.percentile(values, 90)),
]
def extract_acoustic_features(audio: np.ndarray, sr: int = 16000) -> np.ndarray:
if audio.size == 0:
return np.zeros(88, dtype=np.float32)
audio = acoustic_normalize_audio(audio)
frames = frame_audio(audio, sr)
window = np.hanning(frames.shape[1]).astype(np.float32)
spectra = np.abs(np.fft.rfft(frames * window, axis=1)).astype(np.float64)
freqs = np.fft.rfftfreq(frames.shape[1], d=1.0 / sr).astype(np.float64)
power = spectra**2
eps = 1e-10
total = power.sum(axis=1) + eps
centroid = (power * freqs).sum(axis=1) / total
bandwidth = np.sqrt((power * (freqs[None, :] - centroid[:, None]) ** 2).sum(axis=1) / total)
cumulative = np.cumsum(power, axis=1)
rolloff_idx = np.argmax(cumulative >= 0.85 * total[:, None], axis=1)
rolloff = freqs[rolloff_idx]
flatness = np.exp(np.mean(np.log(power + eps), axis=1)) / (np.mean(power + eps, axis=1))
zcr = np.mean(np.abs(np.diff(np.signbit(frames), axis=1)), axis=1)
rms = np.sqrt(np.mean(frames**2, axis=1) + eps)
entropy = -(power / total[:, None] * np.log((power / total[:, None]) + eps)).sum(axis=1) / math.log(
power.shape[1]
)
def band_ratio(low: float, high: float) -> np.ndarray:
mask = (freqs >= low) & (freqs < high)
if not np.any(mask):
return np.zeros(power.shape[0])
return power[:, mask].sum(axis=1) / total
bands = [
band_ratio(0, 800),
band_ratio(800, 1800),
band_ratio(1800, 3200),
band_ratio(3200, 5000),
band_ratio(5000, min(7900, sr / 2)),
band_ratio(3500, min(7500, sr / 2)),
]
deltas = np.diff(centroid, prepend=centroid[0])
features: list[float] = [
float(len(audio) / sr),
float(np.mean(audio)),
float(np.std(audio)),
float(np.max(np.abs(audio))),
]
for values in [centroid, bandwidth, rolloff, flatness, zcr, rms, entropy, deltas, *bands]:
features.extend(summarize_feature_values(values))
return np.asarray(features, dtype=np.float32)
ACOUSTIC_MODEL: dict[str, Any] | None = None
ACOUSTIC_EXTRATREES_MODEL: dict[str, Any] | None = None
def load_acoustic_model() -> dict[str, Any] | None:
global ACOUSTIC_MODEL
if not acoustic_hint_enabled():
return None
if ACOUSTIC_MODEL is None:
if not ACOUSTIC_MODEL_PATH.exists():
return None
ACOUSTIC_MODEL = json.loads(ACOUSTIC_MODEL_PATH.read_text(encoding="utf-8"))
return ACOUSTIC_MODEL
def load_acoustic_extratrees_model() -> dict[str, Any] | None:
global ACOUSTIC_EXTRATREES_MODEL
if not acoustic_hint_enabled():
return None
if not ACOUSTIC_EXTRATREES_MODEL_PATH.exists():
return None
if ACOUSTIC_EXTRATREES_MODEL is None:
import joblib
ACOUSTIC_EXTRATREES_MODEL = joblib.load(ACOUSTIC_EXTRATREES_MODEL_PATH)
return ACOUSTIC_EXTRATREES_MODEL
def classify_acoustic_extratrees(waveform: np.ndarray) -> dict[str, Any] | None:
model = load_acoustic_extratrees_model()
if model is None:
return None
features = extract_acoustic_features(waveform, sr=int(model.get("sample_rate", 16000))).reshape(1, -1)
classifier = model["classifier"]
prediction = str(classifier.predict(features)[0])
confidence = 1.0
class_scores: dict[str, float] = {}
if hasattr(classifier, "predict_proba"):
probabilities = classifier.predict_proba(features)[0]
classes = [str(label) for label in classifier.classes_]
class_scores = {
label: round(float(probability), 6)
for label, probability in sorted(zip(classes, probabilities), key=lambda item: item[1], reverse=True)
}
confidence = float(class_scores.get(prediction, 0.0))
return {
"detected_class": prediction,
"raw_class": prediction,
"confidence": confidence,
"class_scores": class_scores,
"model_name": model.get("name", "lisper_v18_extratrees_acoustic_hint"),
"train_rows": model.get("train_rows"),
"feature_count": model.get("feature_count"),
"holdout_accuracy": model.get("holdout_accuracy"),
"low_confidence_defaulted_to_clear": False,
}
def apply_live_clear_guard(result: dict[str, Any] | None) -> dict[str, Any] | None:
if result is not None:
result["live_clear_guard_applied"] = False
return result
def classify_acoustic_knn(waveform: np.ndarray) -> dict[str, Any] | None:
model = load_acoustic_model()
if model is None:
return None
features = extract_acoustic_features(waveform, sr=int(model.get("sample_rate", 16000)))
mean = np.asarray(model["mean"], dtype=np.float32)
std = np.asarray(model["std"], dtype=np.float32)
normalized = (features - mean) / np.where(std < 1e-6, 1.0, std)
distances = []
for exemplar in model["exemplars"]:
exemplar_features = np.asarray(exemplar["features"], dtype=np.float32)
distance = float(np.linalg.norm(normalized - exemplar_features))
distances.append((distance, exemplar["label"], exemplar.get("source_id", "")))
distances.sort(key=lambda item: item[0])
class_scores: dict[str, float] = {label: 0.0 for label in model["classes"]}
for distance, label, _source_id in distances[:ACOUSTIC_K]:
class_scores[label] += 1.0 / max(distance, 1e-4)
ranked = sorted(class_scores.items(), key=lambda item: item[1], reverse=True)
top_label, top_score = ranked[0]
total_score = sum(class_scores.values()) or 1.0
confidence = float(top_score / total_score)
detected_class = top_label if confidence >= ACOUSTIC_MIN_CONFIDENCE else "clear"
return {
"detected_class": detected_class,
"raw_class": top_label,
"confidence": confidence,
"nearest_distance": round(distances[0][0], 4),
"nearest_source_id": distances[0][2],
"class_scores": {label: round(float(score), 6) for label, score in ranked},
"model_name": model.get("name"),
"low_confidence_defaulted_to_clear": detected_class == "clear" and top_label != "clear",
}
def _compact_acoustic_result(result: dict[str, Any] | None) -> dict[str, Any] | None:
if result is None:
return None
keys = (
"detected_class",
"raw_class",
"confidence",
"nearest_distance",
"nearest_source_id",
"class_scores",
"model_name",
"low_confidence_defaulted_to_clear",
)
return {key: result[key] for key in keys if key in result}
def maybe_apply_knn_override(
extratrees_result: dict[str, Any] | None,
knn_result: dict[str, Any] | None,
) -> dict[str, Any] | None:
if extratrees_result is None or knn_result is None:
return extratrees_result
knn_label = normalize_label(str(knn_result.get("raw_class") or knn_result.get("detected_class") or ""))
if knn_label == "clear":
return extratrees_result
confidence = float(knn_result.get("confidence") or 0.0)
nearest_distance = float(knn_result.get("nearest_distance") or math.inf)
max_distance = knn_override_max_distance()
min_confidence = knn_override_min_confidence()
if confidence < min_confidence or nearest_distance > max_distance:
return {
**extratrees_result,
"hybrid_override_applied": False,
"hybrid_override_reason": "knn_not_close_enough",
"hybrid_override_thresholds": {
"max_distance": max_distance,
"min_confidence": min_confidence,
},
"knn_result": _compact_acoustic_result(knn_result),
}
class_scores = {
label: float(score)
for label, score in (knn_result.get("class_scores") or {}).items()
if normalize_label(str(label)) in ALLOWED_CLASSES
}
return {
**extratrees_result,
"detected_class": knn_label,
"raw_class": knn_label,
"confidence": confidence,
"class_scores": class_scores,
"model_name": "lisper_hybrid_extratrees_knn_synthetic_override",
"low_confidence_defaulted_to_clear": False,
"hybrid_override_applied": True,
"hybrid_override_reason": "knn_close_synthetic_exemplar",
"hybrid_override_thresholds": {
"max_distance": max_distance,
"min_confidence": min_confidence,
},
"extratrees_result": _compact_acoustic_result(extratrees_result),
"knn_result": _compact_acoustic_result(knn_result),
}
def classify_acoustic(waveform: np.ndarray) -> dict[str, Any] | None:
preference = acoustic_model_preference()
if preference == "extratrees":
return classify_acoustic_extratrees(waveform)
if preference == "knn":
return classify_acoustic_knn(waveform)
extratrees_result = classify_acoustic_extratrees(waveform)
knn_result = classify_acoustic_knn(waveform)
if extratrees_result is not None:
return maybe_apply_knn_override(extratrees_result, knn_result)
return knn_result
def enforce_acoustic_response(response: str, acoustic_result: dict[str, Any] | None) -> tuple[str, dict[str, Any]]:
parsed = parse_response(response)
if not acoustic_result:
return response, parsed
detected_class = normalize_label(str(acoustic_result["detected_class"]))
if acoustic_result.get("live_clear_guard_applied"):
template = GUARDED_CLASS_TEMPLATES.get(detected_class, CLASS_TEMPLATES[detected_class])
else:
template = CLASS_TEMPLATES[detected_class]
encouragement = parsed.get("encouragement") or "Good effort. One focused repetition is enough for the next try."
final_response = "\n".join(
[
f"Detected class: {detected_class}",
f"Reason: {template['reason']}",
f"Corrective cue: {template['cue']}",
f"Encouragement: {encouragement}",
]
)
final_parsed = parse_response(final_response)
final_parsed["raw_model_response"] = response
final_parsed["acoustic_hint_enforced"] = True
return final_response, final_parsed
def build_inconclusive_response(
decision: dict[str, Any],
acoustic_result: dict[str, Any] | None,
clip_diagnostics: dict[str, Any],
) -> tuple[str, dict[str, Any]]:
reason = str(decision.get("decision_reason") or "The clip was not reliable enough to classify.")
if decision.get("status") == "error":
response = "Analysis unavailable. The acoustic model is not loaded, so Lisper will not guess a class."
else:
response = "\n".join(
[
"Detected class: inconclusive",
f"Reason: {reason}",
"Corrective cue: Record one clear phrase with /s/ or /z/ sounds, close to the microphone, then try again.",
"Encouragement: The clip was captured; we just need a cleaner attempt before giving a label.",
]
)
parsed = parse_response(response)
parsed["status"] = str(decision.get("status") or "inconclusive")
parsed["raw_model_response"] = None
parsed["acoustic_hint_enforced"] = False
parsed["audio_diagnostics"] = clip_diagnostics
parsed["acoustic_analysis"] = acoustic_result
parsed["live_audio_gate"] = decision
return response, parsed
def build_detected_acoustic_response(
acoustic_result: dict[str, Any],
decision: dict[str, Any],
clip_diagnostics: dict[str, Any],
) -> tuple[str, dict[str, Any]]:
detected_class = normalize_label(str(decision.get("detected_class") or acoustic_result.get("detected_class") or ""))
if detected_class not in CLASS_TEMPLATES:
detected_class = "inconclusive"
if detected_class == "inconclusive":
return build_inconclusive_response(
{
**decision,
"status": "inconclusive",
"decision_reason": "The live gate did not produce a valid class label.",
},
acoustic_result,
clip_diagnostics,
)
template = CLASS_TEMPLATES[detected_class]
response = "\n".join(
[
f"Detected class: {detected_class}",
f"Reason: {template['reason']}",
f"Corrective cue: {template['cue']}",
"Encouragement: Nice work getting a usable recording. Try one focused repetition next.",
]
)
parsed = parse_response(response)
parsed["status"] = "detected"
parsed["raw_model_response"] = None
parsed["acoustic_hint_enforced"] = True
parsed["gemma_generation_skipped"] = True
parsed["audio_diagnostics"] = clip_diagnostics
parsed["acoustic_analysis"] = acoustic_result
parsed["live_audio_gate"] = decision
return response, parsed
def build_audio_only_inconclusive_decision(clip_diagnostics: dict[str, Any]) -> dict[str, Any] | None:
policy = live_audio_policy()
if clip_diagnostics["sibilant_frame_ratio"] >= policy.min_sibilant_frame_ratio:
return None
return {
"status": "inconclusive",
"detected_class": "inconclusive",
"candidate_class": None,
"decision_reason": "The clip has speech energy, but not enough usable /s/ or /z/ airflow evidence.",
"thresholds": {
"min_audio_seconds": policy.min_audio_seconds,
"min_peak": policy.min_peak,
"min_rms": policy.min_rms,
"min_voiced_ratio": policy.min_voiced_ratio,
"min_speech_frame_ratio": policy.min_speech_frame_ratio,
"min_tonal_frame_ratio": policy.min_tonal_frame_ratio,
"min_sibilant_frame_ratio": policy.min_sibilant_frame_ratio,
"max_noise_flatness": policy.max_noise_flatness,
"max_clipping_ratio": policy.max_clipping_ratio,
"clear_min_confidence": policy.clear_min_confidence,
"clear_min_margin": policy.clear_min_margin,
"nonclear_min_confidence": policy.nonclear_min_confidence,
"nonclear_min_margin": policy.nonclear_min_margin,
},
"audio_diagnostics": clip_diagnostics,
"classifier": {"available": False, "skipped": "insufficient_sibilant_evidence"},
}
def audio_token_id(processor: Any) -> int | None:
value = getattr(processor, "audio_token_id", None)
if value is not None:
return int(value)
tokenizer = getattr(processor, "tokenizer", None)
token = getattr(processor, "audio_token", None) or getattr(tokenizer, "audio_token", None)
if tokenizer is not None and token is not None:
return int(tokenizer.convert_tokens_to_ids(token))
return None
def replace_audio_token_run(input_ids: torch.Tensor, token_id: int, count: int) -> tuple[torch.Tensor, dict[str, int]]:
positions = (input_ids == token_id).nonzero(as_tuple=False).flatten()
if positions.numel() == 0:
return input_ids, {"original_audio_tokens": 0, "aligned_audio_tokens": count}
start = int(positions[0].item())
end = start
while end < input_ids.shape[0] and int(input_ids[end].item()) == token_id:
end += 1
replacement = torch.full((count,), token_id, dtype=input_ids.dtype, device=input_ids.device)
aligned = torch.cat([input_ids[:start], replacement, input_ids[end:]], dim=0)
return aligned, {"original_audio_tokens": end - start, "aligned_audio_tokens": count}
def model_inference_dtype(model: Any) -> torch.dtype:
dtype = getattr(model, "dtype", None)
if dtype is not None:
return dtype
base_model = getattr(model, "base_model", None)
dtype = getattr(base_model, "dtype", None)
return dtype or torch_dtype()
def module_parameter_dtype(module: Any) -> torch.dtype:
try:
return next(module.parameters()).dtype
except StopIteration:
return model_inference_dtype(module)
def audio_input_dtype(model: Any) -> torch.dtype:
requested = os.environ.get("LISPER_ZERO_GPU_AUDIO_DTYPE", "").strip().lower()
if requested == "float16":
return torch.float16
if requested == "float32":
return torch.float32
if requested == "bfloat16" or adapter_id():
return torch.bfloat16
return module_parameter_dtype(audio_feature_module(model))
def audio_feature_module(model: Any) -> Any:
candidates = [
model,
getattr(model, "model", None),
getattr(model, "base_model", None),
getattr(getattr(model, "base_model", None), "model", None),
getattr(getattr(getattr(model, "base_model", None), "model", None), "model", None),
]
for candidate in candidates:
if candidate is not None and hasattr(candidate, "get_audio_features"):
return candidate
raise AttributeError("Could not locate Gemma audio feature module on loaded model.")
def summarize_inputs(inputs: Any) -> dict[str, Any]:
summary: dict[str, Any] = {}
for key, value in dict(inputs).items():
if hasattr(value, "shape") and hasattr(value, "dtype"):
summary[key] = {
"shape": [int(dim) for dim in value.shape],
"dtype": str(value.dtype),
"device": str(getattr(value, "device", "")),
}
else:
summary[key] = {"type": type(value).__name__}
return summary
def align_audio_placeholders(inputs: Any, processor: Any, model: Any) -> tuple[Any, dict[str, int]]:
if not audio_alignment_enabled():
return inputs, {"audio_alignment_skipped": 1}
if "input_features" not in inputs or "input_features_mask" not in inputs:
return inputs, {}
token_id = audio_token_id(processor)
if token_id is None:
return inputs, {}
with torch.inference_mode():
audio_output = audio_feature_module(model).get_audio_features(
inputs["input_features"],
inputs["input_features_mask"],
return_dict=True,
)
encoded_count = int(audio_output.attention_mask.sum().item())
if encoded_count <= 0 or inputs["input_ids"].shape[0] != 1:
return inputs, {"encoded_audio_tokens": encoded_count}
new_input_ids, metadata = replace_audio_token_run(inputs["input_ids"][0], token_id, encoded_count)
metadata["encoded_audio_tokens"] = encoded_count
if metadata["original_audio_tokens"] == encoded_count:
return inputs, metadata
inputs["input_ids"] = new_input_ids.unsqueeze(0)
inputs["attention_mask"] = torch.ones_like(inputs["input_ids"])
if "mm_token_type_ids" in inputs and hasattr(processor, "create_mm_token_type_ids"):
mm_token_type_ids = processor.create_mm_token_type_ids(inputs["input_ids"].detach().cpu())
inputs["mm_token_type_ids"] = torch.as_tensor(
mm_token_type_ids,
dtype=inputs["input_ids"].dtype,
device=inputs["input_ids"].device,
)
return inputs, metadata
def _analyze_impl(audio: str | tuple[int, np.ndarray] | None, target_text: str) -> tuple[str, str]:
global LAST_INPUT_SUMMARY
waveform = normalize_audio(audio)
clip_diagnostics = validate_audio_for_analysis(waveform)
audio_only_decision = build_audio_only_inconclusive_decision(clip_diagnostics)
if audio_only_decision is not None:
response, parsed = build_inconclusive_response(audio_only_decision, None, clip_diagnostics)
return response, json.dumps(parsed, indent=2)
acoustic_result = classify_acoustic(waveform)
live_decision = decide_live_analysis(acoustic_result, clip_diagnostics, live_audio_policy())
if live_decision["status"] != "detected":
response, parsed = build_inconclusive_response(live_decision, acoustic_result, clip_diagnostics)
return response, json.dumps(parsed, indent=2)
if not gemma_generation_enabled():
response, parsed = build_detected_acoustic_response(acoustic_result, live_decision, clip_diagnostics)
return response, json.dumps(parsed, indent=2)
audio_url = write_temp_audio(waveform)
processor, model = load_runtime()
messages = build_messages(target_text, audio_url, acoustic_result)
inputs = processor.apply_chat_template(
messages,
tokenize=True,
add_generation_prompt=True,
enable_thinking=False,
return_dict=True,
return_tensors="pt",
)
device = next(model.parameters()).device
if hasattr(inputs, "to"):
inputs = inputs.to(device)
for float_key in ("input_features", "pixel_values", "pixel_values_videos"):
if float_key in inputs and hasattr(inputs[float_key], "to"):
inputs[float_key] = inputs[float_key].to(dtype=model_inference_dtype(model))
if "input_features" in inputs:
audio_dtype = audio_input_dtype(model)
inputs["input_features"] = inputs["input_features"].to(dtype=audio_dtype)
if "input_features_mask" in inputs and hasattr(inputs["input_features_mask"], "to"):
inputs["input_features_mask"] = inputs["input_features_mask"].to(device=device)
inputs, alignment = align_audio_placeholders(inputs, processor, model)
LAST_INPUT_SUMMARY = summarize_inputs(inputs)
with torch.inference_mode():
outputs = model.generate(
**inputs,
max_new_tokens=max_new_tokens(),
do_sample=False,
use_cache=True,
)
prompt_length = inputs["input_ids"].shape[1]
decoded = processor.decode(outputs[0][prompt_length:], skip_special_tokens=True)
raw_response = strip_generation_artifacts(decoded)
response, parsed = enforce_acoustic_response(raw_response, acoustic_result)
parsed["status"] = "detected"
parsed["audio_token_alignment"] = alignment
parsed["audio_diagnostics"] = clip_diagnostics
parsed["acoustic_analysis"] = acoustic_result
parsed["live_audio_gate"] = live_decision
return response, json.dumps(parsed, indent=2)
def analyze_with_errors(audio: str | tuple[int, np.ndarray] | None, target_text: str) -> tuple[str, str]:
try:
return _analyze_impl(audio, target_text)
except InvalidAudioError as exc:
payload = {
"status": "rejected_audio",
"reason": str(exc),
"audio_diagnostics": exc.diagnostics,
}
return (
"Recording not usable yet. Please record a clear speech clip before analysis.",
json.dumps(payload, indent=2),
)
except Exception as exc:
payload = {
"error_type": type(exc).__name__,
"message": str(exc),
"model_id": model_id(),
"adapter_id": adapter_id() or None,
"dtype": os.environ.get("LISPER_ZERO_GPU_DTYPE", "float16"),
"load_in_4bit": load_in_4bit_enabled(),
"acoustic_hint_enabled": acoustic_hint_enabled(),
"audio_alignment_enabled": audio_alignment_enabled(),
"zero_gpu_size": zero_gpu_size(),
"input_summary": LAST_INPUT_SUMMARY,
"traceback": traceback.format_exc(limit=8),
}
return f"ZeroGPU inference failed: {type(exc).__name__}: {exc}", json.dumps(payload, indent=2)
def analyze(audio: str | tuple[int, np.ndarray] | None, target_text: str) -> tuple[str, str]:
return analyze_with_errors(audio, target_text)
@spaces.GPU(duration=5, size=zero_gpu_size())
def zero_gpu_healthcheck() -> str:
return "ok"
def analysis_started(browser_recording_payload: str, uploaded_audio: str | tuple[int, np.ndarray] | None) -> tuple[str, str]:
if not browser_recording_payload.strip() and uploaded_audio is None:
return (
"No clip ready yet. Use the browser recorder or upload a short speech clip first.",
json.dumps({"status": "waiting_for_audio"}, indent=2),
)
return (
"Checking the recording quality and acoustic evidence...",
json.dumps({"status": "running", "stage": "audio_preflight_then_acoustic_gate"}, indent=2),
)
def analyze_ui(
browser_recording_payload: str,
uploaded_audio: str | tuple[int, np.ndarray] | None,
target_text: str,
) -> tuple[str, str]:
selected_audio: str | tuple[int, np.ndarray] | None = (
browser_recording_payload.strip() if browser_recording_payload.strip() else uploaded_audio
)
if selected_audio is None:
return (
"No clip ready yet. Use the browser recorder or upload a short speech clip first.",
json.dumps({"status": "waiting_for_audio"}, indent=2),
)
return analyze_with_errors(selected_audio, target_text)
def build_app() -> gr.Blocks:
with gr.Blocks(title="Lisper ZeroGPU", theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# Lisper ZeroGPU
Server-side Gemma 4 audio analysis for users whose browser cannot comfortably run the WebGPU model.
The currently validated fine-tuned Lisper model is Gemma 4 E2B. E4B and 31B are future model targets and should be deployed as separate revisions after training/eval.
"""
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Browser recorder")
browser_recording_payload = gr.Textbox(
label="Browser recorder payload",
visible=False,
elem_id="lisper-browser-recorder-payload",
)
browser_recording_status = gr.Markdown(
"No browser recording ready. This path bypasses Gradio's microphone recorder."
)
browser_recording_playback = gr.HTML("")
with gr.Row():
browser_record = gr.Button("Record", variant="primary")
browser_stop = gr.Button("Stop")
browser_clear = gr.Button("Clear")
gr.Markdown("### Upload fallback")
audio = gr.Audio(
sources=["upload"],
type="filepath",
label="Speech clip upload",
editable=False,
waveform_options=gr.WaveformOptions(show_recording_waveform=False),
)
audio_status = gr.Markdown(
"No uploaded clip ready. Use the browser recorder above, or upload an audio file here."
)
target_text = gr.Textbox(
label="Expected text",
placeholder="Example: Sally sells seashells.",
lines=2,
)
run = gr.Button("Analyze", variant="primary")
with gr.Column(scale=1):
output = gr.Textbox(label="Gemma response", lines=8)
parsed = gr.Code(label="Parsed JSON", language="json")
gr.Markdown(
f"""
**Configured model:** `{model_id()}`
**Configured adapter:** `{adapter_id() or "none"}`
**Adapter 4-bit load:** `{load_in_4bit_enabled()}`
**Acoustic hint:** `{acoustic_hint_enabled()}`
**Audio token alignment:** `{audio_alignment_enabled()}`
**ZeroGPU size:** `{zero_gpu_size()}`
If this Space errors on private or gated models, add `HF_TOKEN` as a Space secret. For local development without downloading the model, set `LISPER_ZERO_GPU_EAGER_LOAD=0`.
"""
)
browser_record.click(
None,
inputs=[browser_recording_payload],
outputs=[browser_recording_payload, browser_recording_status, browser_recording_playback],
js=BROWSER_RECORDER_START_JS,
queue=False,
show_progress="hidden",
)
browser_stop.click(
None,
inputs=[],
outputs=[browser_recording_payload, browser_recording_status, browser_recording_playback],
js=BROWSER_RECORDER_STOP_JS,
queue=False,
show_progress="hidden",
)
browser_clear.click(
None,
inputs=[],
outputs=[browser_recording_payload, browser_recording_status, browser_recording_playback],
js=BROWSER_RECORDER_CLEAR_JS,
queue=False,
show_progress="hidden",
)
audio.change(
lambda: "Uploaded clip ready. Analyze is available.",
inputs=[],
outputs=[audio_status],
queue=False,
show_progress="hidden",
)
audio.clear(
lambda: "No uploaded clip ready. Use the browser recorder above, or upload an audio file here.",
inputs=[],
outputs=[audio_status],
queue=False,
show_progress="hidden",
)
run.click(
analysis_started,
inputs=[browser_recording_payload, audio],
outputs=[output, parsed],
queue=False,
).then(
analyze_ui,
inputs=[browser_recording_payload, audio, target_text],
outputs=[output, parsed],
api_name="analyze",
)
return demo
demo = build_app()
if __name__ == "__main__":
demo.queue(default_concurrency_limit=1).launch(show_error=True)