from __future__ import annotations import base64 import io import json import math import os import re import tempfile import traceback from typing import Any from pathlib import Path # Unsloth's compiled Gemma 4 audio path can trip TorchDynamo on ZeroGPU's # runtime torch build. Keep inference eager for reliability. os.environ.setdefault("TORCHDYNAMO_DISABLE", "1") os.environ.setdefault("UNSLOTH_COMPILE_DISABLE", "1") import gradio as gr import librosa import numpy as np import soundfile as sf import spaces import torch from transformers import AutoProcessor, Gemma4ForConditionalGeneration from live_audio_policy import ( LiveAudioPolicy, compute_live_audio_diagnostics, decide_live_analysis, validate_live_audio_diagnostics, ) ALLOWED_CLASSES = {"clear", "frontal", "lateral", "dental", "palatal"} DEFAULT_MODEL_ID = "thomasjvu/lisper-gemma4-e2b-audio-full" DEFAULT_ADAPTER_ID = "" SPACE_ROOT = Path(__file__).resolve().parent ACOUSTIC_MODEL_PATH = SPACE_ROOT / "acoustic_model.json" ACOUSTIC_EXTRATREES_MODEL_PATH = SPACE_ROOT / "acoustic_extratrees_v18.joblib" ACOUSTIC_K = 5 ACOUSTIC_MIN_CONFIDENCE = 0.42 KNN_OVERRIDE_MAX_DISTANCE = 0.25 KNN_OVERRIDE_MIN_CONFIDENCE = 0.90 LIVE_CLEAR_MIN_CONFIDENCE = 0.85 LIVE_CLEAR_MIN_MARGIN = 0.25 LIVE_NONCLEAR_MIN_CONFIDENCE = 0.55 LIVE_NONCLEAR_MIN_MARGIN = 0.12 MIN_AUDIO_SECONDS = 0.45 MIN_AUDIO_RMS = 0.0015 MIN_AUDIO_PEAK = 0.012 MIN_VOICED_RATIO = 0.002 MIN_SPEECH_FRAME_RATIO = 0.04 MIN_TONAL_FRAME_RATIO = 0.04 MIN_SIBILANT_FRAME_RATIO = 0.015 MAX_NOISE_FLATNESS = 0.40 MAX_CLIPPING_RATIO = 0.08 DEFAULT_PROMPT = """Analyze this pronunciation attempt for lisp type and give concise corrective coaching. Return exactly four labeled lines in this order: Detected class: clear|frontal|lateral|dental|palatal Reason: one brief reason tied to tongue placement or airflow Corrective cue: one concrete next-step cue Encouragement: one brief supportive line""" CLASS_TEMPLATES = { "clear": { "reason": "The acoustic pattern did not strongly match the trained lisp-pattern examples, so this is treated as a tentative clear result.", "cue": "Repeat once at a relaxed pace and keep the airflow centered through the front of the mouth.", }, "dental": { "reason": "The acoustic pattern is closest to the dental examples, where tongue contact near the teeth can narrow the /s/ groove.", "cue": "Relax the tongue slightly off the teeth and keep a narrow stream of air moving forward.", }, "frontal": { "reason": "The acoustic pattern is closest to the frontal examples, which often sound like the airflow is too far forward.", "cue": "Keep the tongue tip just behind the upper teeth and send the air straight forward through a small groove.", }, "lateral": { "reason": "The acoustic pattern is closest to the lateral examples, where air may be escaping around the sides of the tongue.", "cue": "Start from a light /t/ position, seal the tongue sides, and let the air move forward through the center.", }, "palatal": { "reason": "The acoustic pattern is closest to the palatal examples, where the tongue can sit too far back and muffle the sound.", "cue": "Bring the tongue tip slightly forward behind the upper teeth and brighten the airflow.", }, } GUARDED_CLASS_TEMPLATES = { "dental": { "reason": "The acoustic model was not confident enough to call this clear; the nearest non-clear pattern is dental, which can happen when the tongue presses too close to the teeth.", "cue": "Try one slower repetition with the tongue relaxed just behind the teeth and the air moving forward through a narrow center groove.", }, "frontal": { "reason": "The acoustic model was not confident enough to call this clear; the nearest non-clear pattern is frontal, which can happen when the tongue or airflow moves too far forward.", "cue": "Keep the tongue tip behind the upper teeth and avoid letting it push between the teeth during /s/ sounds.", }, "lateral": { "reason": "The acoustic model was not confident enough to call this clear; the nearest non-clear pattern is lateral, where air may be leaking around the tongue sides.", "cue": "Seal the tongue sides lightly against the upper molars and aim the air straight down the middle.", }, "palatal": { "reason": "The acoustic model was not confident enough to call this clear; the nearest non-clear pattern is palatal, where the tongue may be sitting too far back.", "cue": "Bring the tongue tip forward just behind the upper teeth and brighten the /s/ airflow.", }, } BROWSER_RECORDER_START_JS = r""" async (payload) => { const state = window.__lisperRecorder || {}; if (state.recording) { return [payload || "", "Recording is already active.", ""]; } if (!navigator.mediaDevices || !navigator.mediaDevices.getUserMedia) { return [payload || "", "This browser cannot access microphone recording.", ""]; } const stream = await navigator.mediaDevices.getUserMedia({ audio: { channelCount: 1, echoCancellation: false, noiseSuppression: false, autoGainControl: true, }, video: false, }); const AudioContextCtor = window.AudioContext || window.webkitAudioContext; const audioContext = new AudioContextCtor({ sampleRate: 16000 }); await audioContext.resume(); const source = audioContext.createMediaStreamSource(stream); const processor = audioContext.createScriptProcessor(4096, 1, 1); const silentGain = audioContext.createGain(); silentGain.gain.value = 0; const chunks = []; let peak = 0; let sumSquares = 0; let sampleCount = 0; processor.onaudioprocess = (event) => { if (!window.__lisperRecorder?.recording) { return; } const input = event.inputBuffer.getChannelData(0); const copy = new Float32Array(input.length); copy.set(input); chunks.push(copy); for (let i = 0; i < copy.length; i += 1) { const value = copy[i]; const absValue = Math.abs(value); if (absValue > peak) peak = absValue; sumSquares += value * value; } sampleCount += copy.length; }; source.connect(processor); processor.connect(silentGain); silentGain.connect(audioContext.destination); window.__lisperRecorder = { recording: true, stream, audioContext, source, processor, silentGain, chunks, startedAt: Date.now(), getStats: () => ({ peak, rms: sampleCount ? Math.sqrt(sumSquares / sampleCount) : 0, sampleCount, sampleRate: audioContext.sampleRate, }), }; return ["", "Recording through Web Audio... press Stop when finished.", ""]; } """ BROWSER_RECORDER_STOP_JS = r""" async () => { const state = window.__lisperRecorder; if (!state || !state.recording) { return ["", "No active browser recording. Press Record first.", ""]; } state.recording = false; try { state.processor.disconnect(); } catch (_) {} try { state.source.disconnect(); } catch (_) {} try { state.silentGain.disconnect(); } catch (_) {} for (const track of state.stream.getTracks()) { track.stop(); } const stats = state.getStats(); const sampleRate = stats.sampleRate || 16000; const totalLength = state.chunks.reduce((sum, chunk) => sum + chunk.length, 0); const samples = new Float32Array(totalLength); let offset = 0; for (const chunk of state.chunks) { samples.set(chunk, offset); offset += chunk.length; } await state.audioContext.close().catch(() => undefined); window.__lisperRecorder = null; function writeString(view, byteOffset, string) { for (let i = 0; i < string.length; i += 1) { view.setUint8(byteOffset + i, string.charCodeAt(i)); } } function encodeWav(floatSamples, wavSampleRate) { const bytesPerSample = 2; const blockAlign = bytesPerSample; const buffer = new ArrayBuffer(44 + floatSamples.length * bytesPerSample); const view = new DataView(buffer); writeString(view, 0, "RIFF"); view.setUint32(4, 36 + floatSamples.length * bytesPerSample, true); writeString(view, 8, "WAVE"); writeString(view, 12, "fmt "); view.setUint32(16, 16, true); view.setUint16(20, 1, true); view.setUint16(22, 1, true); view.setUint32(24, wavSampleRate, true); view.setUint32(28, wavSampleRate * blockAlign, true); view.setUint16(32, blockAlign, true); view.setUint16(34, 16, true); writeString(view, 36, "data"); view.setUint32(40, floatSamples.length * bytesPerSample, true); let byteOffset = 44; for (let i = 0; i < floatSamples.length; i += 1, byteOffset += 2) { const clamped = Math.max(-1, Math.min(1, floatSamples[i])); view.setInt16(byteOffset, clamped < 0 ? clamped * 0x8000 : clamped * 0x7fff, true); } return new Blob([view], { type: "audio/wav" }); } const blob = encodeWav(samples, sampleRate); const dataUrl = await new Promise((resolve, reject) => { const reader = new FileReader(); reader.onload = () => resolve(reader.result); reader.onerror = () => reject(reader.error); reader.readAsDataURL(blob); }); const durationSeconds = sampleRate ? samples.length / sampleRate : 0; const payload = JSON.stringify({ source: "browser-web-audio-wav", data_url: dataUrl, mime_type: "audio/wav", sample_rate: sampleRate, sample_count: samples.length, duration_seconds: Number(durationSeconds.toFixed(3)), peak: Number(stats.peak.toFixed(6)), rms: Number(stats.rms.toFixed(6)), created_at: new Date().toISOString(), }); const status = stats.peak < 0.003 ? `Clip captured but appears very quiet. peak=${stats.peak.toFixed(6)} rms=${stats.rms.toFixed(6)}. Check browser microphone permission/input.` : `Clip ready: ${durationSeconds.toFixed(1)}s, peak=${stats.peak.toFixed(3)}. Playback should contain your voice.`; const playback = ``; return [payload, status, playback]; } """ BROWSER_RECORDER_CLEAR_JS = r""" async () => { const state = window.__lisperRecorder; if (state?.recording) { state.recording = false; try { state.processor.disconnect(); } catch (_) {} try { state.source.disconnect(); } catch (_) {} try { state.silentGain.disconnect(); } catch (_) {} for (const track of state.stream.getTracks()) { track.stop(); } await state.audioContext.close().catch(() => undefined); } window.__lisperRecorder = null; return ["", "No browser recording ready.", ""]; } """ class InvalidAudioError(ValueError): """Raised when a clip is too short or too quiet to analyze honestly.""" def __init__(self, message: str, diagnostics: dict[str, Any]): super().__init__(message) self.diagnostics = diagnostics def env_int(name: str, default: int) -> int: try: return int(os.environ.get(name, str(default))) except ValueError: return default def env_float(name: str, default: float) -> float: try: return float(os.environ.get(name, str(default))) except ValueError: return default def model_id() -> str: return os.environ.get("LISPER_ZERO_GPU_MODEL_ID", DEFAULT_MODEL_ID).strip() or DEFAULT_MODEL_ID def adapter_id() -> str: return os.environ.get("LISPER_ZERO_GPU_ADAPTER_ID", DEFAULT_ADAPTER_ID).strip() def max_new_tokens() -> int: return env_int("LISPER_ZERO_GPU_MAX_NEW_TOKENS", 96) def max_seq_length() -> int: return env_int("LISPER_ZERO_GPU_MAX_SEQ_LENGTH", 2048) def zero_gpu_size() -> str: requested = os.environ.get("LISPER_ZERO_GPU_SIZE", "large").strip().lower() return "xlarge" if requested == "xlarge" else "large" def eager_load_enabled() -> bool: return os.environ.get("LISPER_ZERO_GPU_EAGER_LOAD", "0").strip() != "0" def load_in_4bit_enabled() -> bool: default = "1" if adapter_id() else "0" return os.environ.get("LISPER_ZERO_GPU_LOAD_IN_4BIT", default).strip() != "0" def acoustic_hint_enabled() -> bool: return os.environ.get("LISPER_ZERO_GPU_ACOUSTIC_HINT", "1").strip() != "0" def acoustic_model_preference() -> str: requested = os.environ.get("LISPER_ZERO_GPU_ACOUSTIC_MODEL", "auto").strip().lower() if requested in {"extratrees", "knn"}: return requested return "auto" def live_clear_min_confidence() -> float: return env_float("LISPER_ZERO_GPU_LIVE_CLEAR_MIN_CONFIDENCE", LIVE_CLEAR_MIN_CONFIDENCE) def live_clear_min_margin() -> float: return env_float("LISPER_ZERO_GPU_LIVE_CLEAR_MIN_MARGIN", LIVE_CLEAR_MIN_MARGIN) def live_nonclear_min_confidence() -> float: return env_float( "LISPER_ZERO_GPU_LIVE_NONCLEAR_MIN_CONFIDENCE", env_float("LISPER_ZERO_GPU_LIVE_NONCLEAR_MIN_SCORE", LIVE_NONCLEAR_MIN_CONFIDENCE), ) def live_nonclear_min_margin() -> float: return env_float("LISPER_ZERO_GPU_LIVE_NONCLEAR_MIN_MARGIN", LIVE_NONCLEAR_MIN_MARGIN) def knn_override_max_distance() -> float: return env_float("LISPER_ZERO_GPU_KNN_OVERRIDE_MAX_DISTANCE", KNN_OVERRIDE_MAX_DISTANCE) def knn_override_min_confidence() -> float: return env_float("LISPER_ZERO_GPU_KNN_OVERRIDE_MIN_CONFIDENCE", KNN_OVERRIDE_MIN_CONFIDENCE) def live_audio_policy() -> LiveAudioPolicy: return LiveAudioPolicy( min_audio_seconds=env_float("LISPER_ZERO_GPU_MIN_AUDIO_SECONDS", MIN_AUDIO_SECONDS), min_peak=env_float("LISPER_ZERO_GPU_MIN_AUDIO_PEAK", MIN_AUDIO_PEAK), min_rms=env_float("LISPER_ZERO_GPU_MIN_AUDIO_RMS", MIN_AUDIO_RMS), min_voiced_ratio=env_float("LISPER_ZERO_GPU_MIN_VOICED_RATIO", MIN_VOICED_RATIO), min_speech_frame_ratio=env_float("LISPER_ZERO_GPU_MIN_SPEECH_FRAME_RATIO", MIN_SPEECH_FRAME_RATIO), min_tonal_frame_ratio=env_float("LISPER_ZERO_GPU_MIN_TONAL_FRAME_RATIO", MIN_TONAL_FRAME_RATIO), min_sibilant_frame_ratio=env_float("LISPER_ZERO_GPU_MIN_SIBILANT_FRAME_RATIO", MIN_SIBILANT_FRAME_RATIO), max_noise_flatness=env_float("LISPER_ZERO_GPU_MAX_NOISE_FLATNESS", MAX_NOISE_FLATNESS), max_clipping_ratio=env_float("LISPER_ZERO_GPU_MAX_CLIPPING_RATIO", MAX_CLIPPING_RATIO), clear_min_confidence=live_clear_min_confidence(), clear_min_margin=live_clear_min_margin(), nonclear_min_confidence=live_nonclear_min_confidence(), nonclear_min_margin=live_nonclear_min_margin(), ) def audio_alignment_enabled() -> bool: default = "0" if adapter_id() else "1" return os.environ.get("LISPER_ZERO_GPU_ALIGN_AUDIO_TOKENS", default).strip() != "0" def gemma_generation_enabled() -> bool: return os.environ.get("LISPER_ZERO_GPU_USE_GEMMA_GENERATION", "0").strip() != "0" def torch_dtype() -> torch.dtype: requested = os.environ.get("LISPER_ZERO_GPU_DTYPE", "float16").strip().lower() if requested == "bfloat16": return torch.bfloat16 if requested == "float32": return torch.float32 return torch.float16 def auth_token() -> str | None: token = os.environ.get("HF_TOKEN", "").strip() return token or None def decode_browser_audio_payload(payload: str) -> np.ndarray: try: parsed = json.loads(payload) data_url = str(parsed.get("data_url") or "") if "," not in data_url: raise ValueError("Browser recorder payload is missing audio data.") _, encoded = data_url.split(",", 1) audio_bytes = base64.b64decode(encoded) waveform, sample_rate = sf.read(io.BytesIO(audio_bytes), dtype="float32", always_2d=False) except Exception as exc: raise InvalidAudioError( f"Browser recording could not be decoded: {type(exc).__name__}: {exc}", {"status": "invalid_browser_audio_payload"}, ) from exc if waveform.ndim > 1: waveform = waveform.mean(axis=1) if sample_rate != 16000: waveform = librosa.resample(waveform, orig_sr=sample_rate, target_sr=16000) return waveform.astype(np.float32) def normalize_audio(audio_value: str | tuple[int, np.ndarray] | None) -> np.ndarray: if audio_value is None: raise gr.Error("Record or upload a short audio clip first.") if isinstance(audio_value, str): if audio_value.strip().startswith("{"): waveform = decode_browser_audio_payload(audio_value) sample_rate = 16000 else: waveform, sample_rate = sf.read(audio_value, dtype="float32", always_2d=False) else: sample_rate, waveform = audio_value waveform = np.asarray(waveform, dtype=np.float32) if waveform.ndim > 1: waveform = waveform.mean(axis=1) if sample_rate != 16000: waveform = librosa.resample(waveform, orig_sr=sample_rate, target_sr=16000) # Keep ZeroGPU requests bounded. max_samples = 12 * 16000 if waveform.shape[0] > max_samples: waveform = waveform[:max_samples] peak = float(np.max(np.abs(waveform))) if waveform.size else 0.0 if peak > 1.0: waveform = waveform / peak return waveform.astype(np.float32) def audio_diagnostics(waveform: np.ndarray, sample_rate: int = 16000) -> dict[str, Any]: return compute_live_audio_diagnostics(waveform, sample_rate=sample_rate, policy=live_audio_policy()) def validate_audio_for_analysis(waveform: np.ndarray) -> dict[str, Any]: diagnostics = audio_diagnostics(waveform) decision = validate_live_audio_diagnostics(diagnostics, live_audio_policy()) if decision["status"] != "accepted": diagnostics["live_audio_gate"] = decision raise InvalidAudioError(str(decision["reason"]), diagnostics) return diagnostics def write_temp_audio(waveform: np.ndarray) -> str: fd, path = tempfile.mkstemp(prefix="lisper-zero-gpu-", suffix=".wav") os.close(fd) sf.write(path, waveform, 16000) return path def build_messages(target_text: str, audio_url: str, acoustic_result: dict[str, Any] | None = None) -> list[dict[str, Any]]: instruction = DEFAULT_PROMPT if target_text.strip(): instruction += f'\n\nTarget text: "{target_text.strip()}"' if acoustic_result: instruction += ( "\n\nAcoustic pre-analysis from the waveform: " f"class={acoustic_result['detected_class']}, " f"confidence={acoustic_result['confidence']:.3f}. " "Use this exact class for the Detected class line. Do not override it." ) return [ { "role": "system", "content": [ { "type": "text", "text": "You are Lisper, a supportive speech-therapy assistant focused on concise lisp coaching.", } ], }, { "role": "user", "content": [ {"type": "audio", "url": audio_url}, {"type": "text", "text": instruction}, ], }, ] def build_runtime() -> tuple[Any, Any]: repo_id = model_id() adapter_repo_id = adapter_id() token = auth_token() if adapter_repo_id: import unsloth # noqa: F401 from unsloth import FastVisionModel kwargs = { "model_name": adapter_repo_id, "max_seq_length": max_seq_length(), "load_in_4bit": load_in_4bit_enabled(), "full_finetuning": False, } if token: kwargs["token"] = token model, processor = FastVisionModel.from_pretrained(**kwargs) FastVisionModel.for_inference(model) model.eval() return processor, model processor_source = adapter_repo_id or repo_id processor = AutoProcessor.from_pretrained(processor_source, token=token, trust_remote_code=True) model = Gemma4ForConditionalGeneration.from_pretrained( repo_id, token=token, torch_dtype=torch_dtype(), device_map={"": "cuda"}, trust_remote_code=True, ) model.eval() return processor, model RUNTIME: tuple[Any, Any] | None = build_runtime() if eager_load_enabled() else None LAST_INPUT_SUMMARY: dict[str, Any] = {} def load_runtime() -> tuple[Any, Any]: global RUNTIME if RUNTIME is None: RUNTIME = build_runtime() return RUNTIME def strip_generation_artifacts(text: str) -> str: return text.replace("```", "").replace("", "").strip() def extract_line(label: str, text: str) -> str: match = re.search(rf"^{label}:\s*(.+)$", text, flags=re.IGNORECASE | re.MULTILINE) return match.group(1).strip() if match else "" def normalize_label(text: str) -> str: value = text.strip().lower() if "inconclusive" in value or "unclear" in value: return "inconclusive" for candidate in ALLOWED_CLASSES: if candidate in value: return candidate return "inconclusive" def parse_response(response: str) -> dict[str, Any]: detected = normalize_label(extract_line("Detected class", response)) return { "detected_class": detected, "reason": extract_line("Reason", response), "corrective_cue": extract_line("Corrective cue", response), "encouragement": extract_line("Encouragement", response), "raw_response": response, "model_id": model_id(), "adapter_id": adapter_id() or None, } def acoustic_normalize_audio(audio: np.ndarray) -> np.ndarray: audio = np.asarray(audio, dtype=np.float32).reshape(-1) if audio.size == 0: return audio audio = audio - float(np.mean(audio)) peak = float(np.max(np.abs(audio))) if peak > 0: audio = audio * (0.98 / peak) return audio.astype(np.float32) def frame_audio(audio: np.ndarray, sr: int, frame_ms: float = 25.0, hop_ms: float = 10.0) -> np.ndarray: frame = max(1, int(sr * frame_ms / 1000)) hop = max(1, int(sr * hop_ms / 1000)) if len(audio) < frame: audio = np.pad(audio, (0, frame - len(audio))) count = 1 + (len(audio) - frame) // hop shape = (count, frame) strides = (audio.strides[0] * hop, audio.strides[0]) return np.lib.stride_tricks.as_strided(audio, shape=shape, strides=strides).copy() def summarize_feature_values(values: np.ndarray) -> list[float]: values = np.asarray(values, dtype=np.float64) values = values[np.isfinite(values)] if values.size == 0: return [0.0] * 6 return [ float(np.mean(values)), float(np.std(values)), float(np.min(values)), float(np.max(values)), float(np.percentile(values, 10)), float(np.percentile(values, 90)), ] def extract_acoustic_features(audio: np.ndarray, sr: int = 16000) -> np.ndarray: if audio.size == 0: return np.zeros(88, dtype=np.float32) audio = acoustic_normalize_audio(audio) frames = frame_audio(audio, sr) window = np.hanning(frames.shape[1]).astype(np.float32) spectra = np.abs(np.fft.rfft(frames * window, axis=1)).astype(np.float64) freqs = np.fft.rfftfreq(frames.shape[1], d=1.0 / sr).astype(np.float64) power = spectra**2 eps = 1e-10 total = power.sum(axis=1) + eps centroid = (power * freqs).sum(axis=1) / total bandwidth = np.sqrt((power * (freqs[None, :] - centroid[:, None]) ** 2).sum(axis=1) / total) cumulative = np.cumsum(power, axis=1) rolloff_idx = np.argmax(cumulative >= 0.85 * total[:, None], axis=1) rolloff = freqs[rolloff_idx] flatness = np.exp(np.mean(np.log(power + eps), axis=1)) / (np.mean(power + eps, axis=1)) zcr = np.mean(np.abs(np.diff(np.signbit(frames), axis=1)), axis=1) rms = np.sqrt(np.mean(frames**2, axis=1) + eps) entropy = -(power / total[:, None] * np.log((power / total[:, None]) + eps)).sum(axis=1) / math.log( power.shape[1] ) def band_ratio(low: float, high: float) -> np.ndarray: mask = (freqs >= low) & (freqs < high) if not np.any(mask): return np.zeros(power.shape[0]) return power[:, mask].sum(axis=1) / total bands = [ band_ratio(0, 800), band_ratio(800, 1800), band_ratio(1800, 3200), band_ratio(3200, 5000), band_ratio(5000, min(7900, sr / 2)), band_ratio(3500, min(7500, sr / 2)), ] deltas = np.diff(centroid, prepend=centroid[0]) features: list[float] = [ float(len(audio) / sr), float(np.mean(audio)), float(np.std(audio)), float(np.max(np.abs(audio))), ] for values in [centroid, bandwidth, rolloff, flatness, zcr, rms, entropy, deltas, *bands]: features.extend(summarize_feature_values(values)) return np.asarray(features, dtype=np.float32) ACOUSTIC_MODEL: dict[str, Any] | None = None ACOUSTIC_EXTRATREES_MODEL: dict[str, Any] | None = None def load_acoustic_model() -> dict[str, Any] | None: global ACOUSTIC_MODEL if not acoustic_hint_enabled(): return None if ACOUSTIC_MODEL is None: if not ACOUSTIC_MODEL_PATH.exists(): return None ACOUSTIC_MODEL = json.loads(ACOUSTIC_MODEL_PATH.read_text(encoding="utf-8")) return ACOUSTIC_MODEL def load_acoustic_extratrees_model() -> dict[str, Any] | None: global ACOUSTIC_EXTRATREES_MODEL if not acoustic_hint_enabled(): return None if not ACOUSTIC_EXTRATREES_MODEL_PATH.exists(): return None if ACOUSTIC_EXTRATREES_MODEL is None: import joblib ACOUSTIC_EXTRATREES_MODEL = joblib.load(ACOUSTIC_EXTRATREES_MODEL_PATH) return ACOUSTIC_EXTRATREES_MODEL def classify_acoustic_extratrees(waveform: np.ndarray) -> dict[str, Any] | None: model = load_acoustic_extratrees_model() if model is None: return None features = extract_acoustic_features(waveform, sr=int(model.get("sample_rate", 16000))).reshape(1, -1) classifier = model["classifier"] prediction = str(classifier.predict(features)[0]) confidence = 1.0 class_scores: dict[str, float] = {} if hasattr(classifier, "predict_proba"): probabilities = classifier.predict_proba(features)[0] classes = [str(label) for label in classifier.classes_] class_scores = { label: round(float(probability), 6) for label, probability in sorted(zip(classes, probabilities), key=lambda item: item[1], reverse=True) } confidence = float(class_scores.get(prediction, 0.0)) return { "detected_class": prediction, "raw_class": prediction, "confidence": confidence, "class_scores": class_scores, "model_name": model.get("name", "lisper_v18_extratrees_acoustic_hint"), "train_rows": model.get("train_rows"), "feature_count": model.get("feature_count"), "holdout_accuracy": model.get("holdout_accuracy"), "low_confidence_defaulted_to_clear": False, } def apply_live_clear_guard(result: dict[str, Any] | None) -> dict[str, Any] | None: if result is not None: result["live_clear_guard_applied"] = False return result def classify_acoustic_knn(waveform: np.ndarray) -> dict[str, Any] | None: model = load_acoustic_model() if model is None: return None features = extract_acoustic_features(waveform, sr=int(model.get("sample_rate", 16000))) mean = np.asarray(model["mean"], dtype=np.float32) std = np.asarray(model["std"], dtype=np.float32) normalized = (features - mean) / np.where(std < 1e-6, 1.0, std) distances = [] for exemplar in model["exemplars"]: exemplar_features = np.asarray(exemplar["features"], dtype=np.float32) distance = float(np.linalg.norm(normalized - exemplar_features)) distances.append((distance, exemplar["label"], exemplar.get("source_id", ""))) distances.sort(key=lambda item: item[0]) class_scores: dict[str, float] = {label: 0.0 for label in model["classes"]} for distance, label, _source_id in distances[:ACOUSTIC_K]: class_scores[label] += 1.0 / max(distance, 1e-4) ranked = sorted(class_scores.items(), key=lambda item: item[1], reverse=True) top_label, top_score = ranked[0] total_score = sum(class_scores.values()) or 1.0 confidence = float(top_score / total_score) detected_class = top_label if confidence >= ACOUSTIC_MIN_CONFIDENCE else "clear" return { "detected_class": detected_class, "raw_class": top_label, "confidence": confidence, "nearest_distance": round(distances[0][0], 4), "nearest_source_id": distances[0][2], "class_scores": {label: round(float(score), 6) for label, score in ranked}, "model_name": model.get("name"), "low_confidence_defaulted_to_clear": detected_class == "clear" and top_label != "clear", } def _compact_acoustic_result(result: dict[str, Any] | None) -> dict[str, Any] | None: if result is None: return None keys = ( "detected_class", "raw_class", "confidence", "nearest_distance", "nearest_source_id", "class_scores", "model_name", "low_confidence_defaulted_to_clear", ) return {key: result[key] for key in keys if key in result} def maybe_apply_knn_override( extratrees_result: dict[str, Any] | None, knn_result: dict[str, Any] | None, ) -> dict[str, Any] | None: if extratrees_result is None or knn_result is None: return extratrees_result knn_label = normalize_label(str(knn_result.get("raw_class") or knn_result.get("detected_class") or "")) if knn_label == "clear": return extratrees_result confidence = float(knn_result.get("confidence") or 0.0) nearest_distance = float(knn_result.get("nearest_distance") or math.inf) max_distance = knn_override_max_distance() min_confidence = knn_override_min_confidence() if confidence < min_confidence or nearest_distance > max_distance: return { **extratrees_result, "hybrid_override_applied": False, "hybrid_override_reason": "knn_not_close_enough", "hybrid_override_thresholds": { "max_distance": max_distance, "min_confidence": min_confidence, }, "knn_result": _compact_acoustic_result(knn_result), } class_scores = { label: float(score) for label, score in (knn_result.get("class_scores") or {}).items() if normalize_label(str(label)) in ALLOWED_CLASSES } return { **extratrees_result, "detected_class": knn_label, "raw_class": knn_label, "confidence": confidence, "class_scores": class_scores, "model_name": "lisper_hybrid_extratrees_knn_synthetic_override", "low_confidence_defaulted_to_clear": False, "hybrid_override_applied": True, "hybrid_override_reason": "knn_close_synthetic_exemplar", "hybrid_override_thresholds": { "max_distance": max_distance, "min_confidence": min_confidence, }, "extratrees_result": _compact_acoustic_result(extratrees_result), "knn_result": _compact_acoustic_result(knn_result), } def classify_acoustic(waveform: np.ndarray) -> dict[str, Any] | None: preference = acoustic_model_preference() if preference == "extratrees": return classify_acoustic_extratrees(waveform) if preference == "knn": return classify_acoustic_knn(waveform) extratrees_result = classify_acoustic_extratrees(waveform) knn_result = classify_acoustic_knn(waveform) if extratrees_result is not None: return maybe_apply_knn_override(extratrees_result, knn_result) return knn_result def enforce_acoustic_response(response: str, acoustic_result: dict[str, Any] | None) -> tuple[str, dict[str, Any]]: parsed = parse_response(response) if not acoustic_result: return response, parsed detected_class = normalize_label(str(acoustic_result["detected_class"])) if acoustic_result.get("live_clear_guard_applied"): template = GUARDED_CLASS_TEMPLATES.get(detected_class, CLASS_TEMPLATES[detected_class]) else: template = CLASS_TEMPLATES[detected_class] encouragement = parsed.get("encouragement") or "Good effort. One focused repetition is enough for the next try." final_response = "\n".join( [ f"Detected class: {detected_class}", f"Reason: {template['reason']}", f"Corrective cue: {template['cue']}", f"Encouragement: {encouragement}", ] ) final_parsed = parse_response(final_response) final_parsed["raw_model_response"] = response final_parsed["acoustic_hint_enforced"] = True return final_response, final_parsed def build_inconclusive_response( decision: dict[str, Any], acoustic_result: dict[str, Any] | None, clip_diagnostics: dict[str, Any], ) -> tuple[str, dict[str, Any]]: reason = str(decision.get("decision_reason") or "The clip was not reliable enough to classify.") if decision.get("status") == "error": response = "Analysis unavailable. The acoustic model is not loaded, so Lisper will not guess a class." else: response = "\n".join( [ "Detected class: inconclusive", f"Reason: {reason}", "Corrective cue: Record one clear phrase with /s/ or /z/ sounds, close to the microphone, then try again.", "Encouragement: The clip was captured; we just need a cleaner attempt before giving a label.", ] ) parsed = parse_response(response) parsed["status"] = str(decision.get("status") or "inconclusive") parsed["raw_model_response"] = None parsed["acoustic_hint_enforced"] = False parsed["audio_diagnostics"] = clip_diagnostics parsed["acoustic_analysis"] = acoustic_result parsed["live_audio_gate"] = decision return response, parsed def build_detected_acoustic_response( acoustic_result: dict[str, Any], decision: dict[str, Any], clip_diagnostics: dict[str, Any], ) -> tuple[str, dict[str, Any]]: detected_class = normalize_label(str(decision.get("detected_class") or acoustic_result.get("detected_class") or "")) if detected_class not in CLASS_TEMPLATES: detected_class = "inconclusive" if detected_class == "inconclusive": return build_inconclusive_response( { **decision, "status": "inconclusive", "decision_reason": "The live gate did not produce a valid class label.", }, acoustic_result, clip_diagnostics, ) template = CLASS_TEMPLATES[detected_class] response = "\n".join( [ f"Detected class: {detected_class}", f"Reason: {template['reason']}", f"Corrective cue: {template['cue']}", "Encouragement: Nice work getting a usable recording. Try one focused repetition next.", ] ) parsed = parse_response(response) parsed["status"] = "detected" parsed["raw_model_response"] = None parsed["acoustic_hint_enforced"] = True parsed["gemma_generation_skipped"] = True parsed["audio_diagnostics"] = clip_diagnostics parsed["acoustic_analysis"] = acoustic_result parsed["live_audio_gate"] = decision return response, parsed def build_audio_only_inconclusive_decision(clip_diagnostics: dict[str, Any]) -> dict[str, Any] | None: policy = live_audio_policy() if clip_diagnostics["sibilant_frame_ratio"] >= policy.min_sibilant_frame_ratio: return None return { "status": "inconclusive", "detected_class": "inconclusive", "candidate_class": None, "decision_reason": "The clip has speech energy, but not enough usable /s/ or /z/ airflow evidence.", "thresholds": { "min_audio_seconds": policy.min_audio_seconds, "min_peak": policy.min_peak, "min_rms": policy.min_rms, "min_voiced_ratio": policy.min_voiced_ratio, "min_speech_frame_ratio": policy.min_speech_frame_ratio, "min_tonal_frame_ratio": policy.min_tonal_frame_ratio, "min_sibilant_frame_ratio": policy.min_sibilant_frame_ratio, "max_noise_flatness": policy.max_noise_flatness, "max_clipping_ratio": policy.max_clipping_ratio, "clear_min_confidence": policy.clear_min_confidence, "clear_min_margin": policy.clear_min_margin, "nonclear_min_confidence": policy.nonclear_min_confidence, "nonclear_min_margin": policy.nonclear_min_margin, }, "audio_diagnostics": clip_diagnostics, "classifier": {"available": False, "skipped": "insufficient_sibilant_evidence"}, } def audio_token_id(processor: Any) -> int | None: value = getattr(processor, "audio_token_id", None) if value is not None: return int(value) tokenizer = getattr(processor, "tokenizer", None) token = getattr(processor, "audio_token", None) or getattr(tokenizer, "audio_token", None) if tokenizer is not None and token is not None: return int(tokenizer.convert_tokens_to_ids(token)) return None def replace_audio_token_run(input_ids: torch.Tensor, token_id: int, count: int) -> tuple[torch.Tensor, dict[str, int]]: positions = (input_ids == token_id).nonzero(as_tuple=False).flatten() if positions.numel() == 0: return input_ids, {"original_audio_tokens": 0, "aligned_audio_tokens": count} start = int(positions[0].item()) end = start while end < input_ids.shape[0] and int(input_ids[end].item()) == token_id: end += 1 replacement = torch.full((count,), token_id, dtype=input_ids.dtype, device=input_ids.device) aligned = torch.cat([input_ids[:start], replacement, input_ids[end:]], dim=0) return aligned, {"original_audio_tokens": end - start, "aligned_audio_tokens": count} def model_inference_dtype(model: Any) -> torch.dtype: dtype = getattr(model, "dtype", None) if dtype is not None: return dtype base_model = getattr(model, "base_model", None) dtype = getattr(base_model, "dtype", None) return dtype or torch_dtype() def module_parameter_dtype(module: Any) -> torch.dtype: try: return next(module.parameters()).dtype except StopIteration: return model_inference_dtype(module) def audio_input_dtype(model: Any) -> torch.dtype: requested = os.environ.get("LISPER_ZERO_GPU_AUDIO_DTYPE", "").strip().lower() if requested == "float16": return torch.float16 if requested == "float32": return torch.float32 if requested == "bfloat16" or adapter_id(): return torch.bfloat16 return module_parameter_dtype(audio_feature_module(model)) def audio_feature_module(model: Any) -> Any: candidates = [ model, getattr(model, "model", None), getattr(model, "base_model", None), getattr(getattr(model, "base_model", None), "model", None), getattr(getattr(getattr(model, "base_model", None), "model", None), "model", None), ] for candidate in candidates: if candidate is not None and hasattr(candidate, "get_audio_features"): return candidate raise AttributeError("Could not locate Gemma audio feature module on loaded model.") def summarize_inputs(inputs: Any) -> dict[str, Any]: summary: dict[str, Any] = {} for key, value in dict(inputs).items(): if hasattr(value, "shape") and hasattr(value, "dtype"): summary[key] = { "shape": [int(dim) for dim in value.shape], "dtype": str(value.dtype), "device": str(getattr(value, "device", "")), } else: summary[key] = {"type": type(value).__name__} return summary def align_audio_placeholders(inputs: Any, processor: Any, model: Any) -> tuple[Any, dict[str, int]]: if not audio_alignment_enabled(): return inputs, {"audio_alignment_skipped": 1} if "input_features" not in inputs or "input_features_mask" not in inputs: return inputs, {} token_id = audio_token_id(processor) if token_id is None: return inputs, {} with torch.inference_mode(): audio_output = audio_feature_module(model).get_audio_features( inputs["input_features"], inputs["input_features_mask"], return_dict=True, ) encoded_count = int(audio_output.attention_mask.sum().item()) if encoded_count <= 0 or inputs["input_ids"].shape[0] != 1: return inputs, {"encoded_audio_tokens": encoded_count} new_input_ids, metadata = replace_audio_token_run(inputs["input_ids"][0], token_id, encoded_count) metadata["encoded_audio_tokens"] = encoded_count if metadata["original_audio_tokens"] == encoded_count: return inputs, metadata inputs["input_ids"] = new_input_ids.unsqueeze(0) inputs["attention_mask"] = torch.ones_like(inputs["input_ids"]) if "mm_token_type_ids" in inputs and hasattr(processor, "create_mm_token_type_ids"): mm_token_type_ids = processor.create_mm_token_type_ids(inputs["input_ids"].detach().cpu()) inputs["mm_token_type_ids"] = torch.as_tensor( mm_token_type_ids, dtype=inputs["input_ids"].dtype, device=inputs["input_ids"].device, ) return inputs, metadata def _analyze_impl(audio: str | tuple[int, np.ndarray] | None, target_text: str) -> tuple[str, str]: global LAST_INPUT_SUMMARY waveform = normalize_audio(audio) clip_diagnostics = validate_audio_for_analysis(waveform) audio_only_decision = build_audio_only_inconclusive_decision(clip_diagnostics) if audio_only_decision is not None: response, parsed = build_inconclusive_response(audio_only_decision, None, clip_diagnostics) return response, json.dumps(parsed, indent=2) acoustic_result = classify_acoustic(waveform) live_decision = decide_live_analysis(acoustic_result, clip_diagnostics, live_audio_policy()) if live_decision["status"] != "detected": response, parsed = build_inconclusive_response(live_decision, acoustic_result, clip_diagnostics) return response, json.dumps(parsed, indent=2) if not gemma_generation_enabled(): response, parsed = build_detected_acoustic_response(acoustic_result, live_decision, clip_diagnostics) return response, json.dumps(parsed, indent=2) audio_url = write_temp_audio(waveform) processor, model = load_runtime() messages = build_messages(target_text, audio_url, acoustic_result) inputs = processor.apply_chat_template( messages, tokenize=True, add_generation_prompt=True, enable_thinking=False, return_dict=True, return_tensors="pt", ) device = next(model.parameters()).device if hasattr(inputs, "to"): inputs = inputs.to(device) for float_key in ("input_features", "pixel_values", "pixel_values_videos"): if float_key in inputs and hasattr(inputs[float_key], "to"): inputs[float_key] = inputs[float_key].to(dtype=model_inference_dtype(model)) if "input_features" in inputs: audio_dtype = audio_input_dtype(model) inputs["input_features"] = inputs["input_features"].to(dtype=audio_dtype) if "input_features_mask" in inputs and hasattr(inputs["input_features_mask"], "to"): inputs["input_features_mask"] = inputs["input_features_mask"].to(device=device) inputs, alignment = align_audio_placeholders(inputs, processor, model) LAST_INPUT_SUMMARY = summarize_inputs(inputs) with torch.inference_mode(): outputs = model.generate( **inputs, max_new_tokens=max_new_tokens(), do_sample=False, use_cache=True, ) prompt_length = inputs["input_ids"].shape[1] decoded = processor.decode(outputs[0][prompt_length:], skip_special_tokens=True) raw_response = strip_generation_artifacts(decoded) response, parsed = enforce_acoustic_response(raw_response, acoustic_result) parsed["status"] = "detected" parsed["audio_token_alignment"] = alignment parsed["audio_diagnostics"] = clip_diagnostics parsed["acoustic_analysis"] = acoustic_result parsed["live_audio_gate"] = live_decision return response, json.dumps(parsed, indent=2) def analyze_with_errors(audio: str | tuple[int, np.ndarray] | None, target_text: str) -> tuple[str, str]: try: return _analyze_impl(audio, target_text) except InvalidAudioError as exc: payload = { "status": "rejected_audio", "reason": str(exc), "audio_diagnostics": exc.diagnostics, } return ( "Recording not usable yet. Please record a clear speech clip before analysis.", json.dumps(payload, indent=2), ) except Exception as exc: payload = { "error_type": type(exc).__name__, "message": str(exc), "model_id": model_id(), "adapter_id": adapter_id() or None, "dtype": os.environ.get("LISPER_ZERO_GPU_DTYPE", "float16"), "load_in_4bit": load_in_4bit_enabled(), "acoustic_hint_enabled": acoustic_hint_enabled(), "audio_alignment_enabled": audio_alignment_enabled(), "zero_gpu_size": zero_gpu_size(), "input_summary": LAST_INPUT_SUMMARY, "traceback": traceback.format_exc(limit=8), } return f"ZeroGPU inference failed: {type(exc).__name__}: {exc}", json.dumps(payload, indent=2) def analyze(audio: str | tuple[int, np.ndarray] | None, target_text: str) -> tuple[str, str]: return analyze_with_errors(audio, target_text) @spaces.GPU(duration=5, size=zero_gpu_size()) def zero_gpu_healthcheck() -> str: return "ok" def analysis_started(browser_recording_payload: str, uploaded_audio: str | tuple[int, np.ndarray] | None) -> tuple[str, str]: if not browser_recording_payload.strip() and uploaded_audio is None: return ( "No clip ready yet. Use the browser recorder or upload a short speech clip first.", json.dumps({"status": "waiting_for_audio"}, indent=2), ) return ( "Checking the recording quality and acoustic evidence...", json.dumps({"status": "running", "stage": "audio_preflight_then_acoustic_gate"}, indent=2), ) def analyze_ui( browser_recording_payload: str, uploaded_audio: str | tuple[int, np.ndarray] | None, target_text: str, ) -> tuple[str, str]: selected_audio: str | tuple[int, np.ndarray] | None = ( browser_recording_payload.strip() if browser_recording_payload.strip() else uploaded_audio ) if selected_audio is None: return ( "No clip ready yet. Use the browser recorder or upload a short speech clip first.", json.dumps({"status": "waiting_for_audio"}, indent=2), ) return analyze_with_errors(selected_audio, target_text) def build_app() -> gr.Blocks: with gr.Blocks(title="Lisper ZeroGPU", theme=gr.themes.Soft()) as demo: gr.Markdown( """ # Lisper ZeroGPU Server-side Gemma 4 audio analysis for users whose browser cannot comfortably run the WebGPU model. The currently validated fine-tuned Lisper model is Gemma 4 E2B. E4B and 31B are future model targets and should be deployed as separate revisions after training/eval. """ ) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### Browser recorder") browser_recording_payload = gr.Textbox( label="Browser recorder payload", visible=False, elem_id="lisper-browser-recorder-payload", ) browser_recording_status = gr.Markdown( "No browser recording ready. This path bypasses Gradio's microphone recorder." ) browser_recording_playback = gr.HTML("") with gr.Row(): browser_record = gr.Button("Record", variant="primary") browser_stop = gr.Button("Stop") browser_clear = gr.Button("Clear") gr.Markdown("### Upload fallback") audio = gr.Audio( sources=["upload"], type="filepath", label="Speech clip upload", editable=False, waveform_options=gr.WaveformOptions(show_recording_waveform=False), ) audio_status = gr.Markdown( "No uploaded clip ready. Use the browser recorder above, or upload an audio file here." ) target_text = gr.Textbox( label="Expected text", placeholder="Example: Sally sells seashells.", lines=2, ) run = gr.Button("Analyze", variant="primary") with gr.Column(scale=1): output = gr.Textbox(label="Gemma response", lines=8) parsed = gr.Code(label="Parsed JSON", language="json") gr.Markdown( f""" **Configured model:** `{model_id()}` **Configured adapter:** `{adapter_id() or "none"}` **Adapter 4-bit load:** `{load_in_4bit_enabled()}` **Acoustic hint:** `{acoustic_hint_enabled()}` **Audio token alignment:** `{audio_alignment_enabled()}` **ZeroGPU size:** `{zero_gpu_size()}` If this Space errors on private or gated models, add `HF_TOKEN` as a Space secret. For local development without downloading the model, set `LISPER_ZERO_GPU_EAGER_LOAD=0`. """ ) browser_record.click( None, inputs=[browser_recording_payload], outputs=[browser_recording_payload, browser_recording_status, browser_recording_playback], js=BROWSER_RECORDER_START_JS, queue=False, show_progress="hidden", ) browser_stop.click( None, inputs=[], outputs=[browser_recording_payload, browser_recording_status, browser_recording_playback], js=BROWSER_RECORDER_STOP_JS, queue=False, show_progress="hidden", ) browser_clear.click( None, inputs=[], outputs=[browser_recording_payload, browser_recording_status, browser_recording_playback], js=BROWSER_RECORDER_CLEAR_JS, queue=False, show_progress="hidden", ) audio.change( lambda: "Uploaded clip ready. Analyze is available.", inputs=[], outputs=[audio_status], queue=False, show_progress="hidden", ) audio.clear( lambda: "No uploaded clip ready. Use the browser recorder above, or upload an audio file here.", inputs=[], outputs=[audio_status], queue=False, show_progress="hidden", ) run.click( analysis_started, inputs=[browser_recording_payload, audio], outputs=[output, parsed], queue=False, ).then( analyze_ui, inputs=[browser_recording_payload, audio, target_text], outputs=[output, parsed], api_name="analyze", ) return demo demo = build_app() if __name__ == "__main__": demo.queue(default_concurrency_limit=1).launch(show_error=True)