import os import math import tempfile import numpy as np import gradio as gr import matplotlib.pyplot as plt from dataclasses import dataclass from typing import Dict, Any, Tuple, List, Optional import soundfile as sf from pydub import AudioSegment from scipy.signal import correlate # ========================================================= # Config # ========================================================= TARGET_SR = 16000 APP_DIR = os.path.dirname(os.path.abspath(__file__)) # ========================================================= # Helpers # ========================================================= def human_seconds(sec: float) -> str: if sec is None or not math.isfinite(sec): return "—" if sec < 60: return f"{sec:.1f}s" m = int(sec // 60) return f"{m}m {sec - 60*m:.1f}s" def safe_pct(x: float) -> str: if x is None or not math.isfinite(x): return "—" return f"{x*100:.1f}%" def list_bundled_audio() -> List[str]: exts = (".mp3", ".wav", ".m4a", ".flac", ".ogg") try: items = os.listdir(APP_DIR) except Exception: return [] files = [fn for fn in items if fn.lower().endswith(exts)] files.sort() return files def _resample_linear(y: np.ndarray, sr: int, target_sr: int) -> np.ndarray: if sr == target_sr or y.size == 0: return y x_old = np.linspace(0.0, 1.0, num=y.size, endpoint=False) new_len = int(round(y.size * (target_sr / sr))) x_new = np.linspace(0.0, 1.0, num=max(new_len, 1), endpoint=False) return np.interp(x_new, x_old, y).astype(np.float32) def load_audio_file(path: str) -> Tuple[np.ndarray, int]: """ Robust loader: - WAV/FLAC/OGG via soundfile - MP3/M4A via pydub (ffmpeg) Returns mono float32 waveform + sr. """ ext = os.path.splitext(path)[1].lower() if ext in [".wav", ".flac", ".ogg"]: y, sr = sf.read(path, always_2d=True) y = y.mean(axis=1).astype(np.float32) return y, int(sr) # MP3/M4A/etc. via pydub seg = AudioSegment.from_file(path) seg = seg.set_channels(1) sr = seg.frame_rate samples = np.array(seg.get_array_of_samples()) # Convert to float32 in [-1, 1] y = samples.astype(np.float32) / (2 ** (8 * seg.sample_width - 1)) return y, int(sr) def diagnostics_text() -> str: bundled = list_bundled_audio() lines = [] lines.append(f"**APP_DIR:** `{APP_DIR}`") lines.append(f"**CWD:** `{os.getcwd()}`") lines.append(f"**Bundled audio found:** {len(bundled)}") if bundled: for fn in bundled: full = os.path.join(APP_DIR, fn) try: size = os.path.getsize(full) lines.append(f"- `{fn}` ({size} bytes)") except Exception: lines.append(f"- `{fn}` (size unknown)") else: lines.append("- *(none found next to app.py)*") lines.append("") lines.append("**If build hangs:** usually heavy deps (e.g. librosa/numba). This version avoids them.") lines.append("**Microphone note:** may be blocked by browser permissions/corporate policy.") return "\n".join(lines) def _finite(x: float) -> bool: return x is not None and isinstance(x, (int, float, np.floating)) and math.isfinite(float(x)) # ========================================================= # Feature extraction (no librosa) # ========================================================= @dataclass class Features: duration_s: float rms_mean: float rms_std: float zcr_mean: float pitch_median_hz: float pitch_iqr_hz: float voiced_ratio: float n_pauses: int pause_total_s: float active_ratio: float def _frame_signal(y: np.ndarray, frame: int, hop: int) -> np.ndarray: if y.size < frame: return np.zeros((0, frame), dtype=np.float32) n = 1 + (y.size - frame) // hop idx = (np.arange(n)[:, None] * hop) + np.arange(frame)[None, :] return y[idx] def _rms_per_frame(frames: np.ndarray) -> np.ndarray: if frames.size == 0: return np.array([], dtype=np.float32) return np.sqrt(np.mean(frames * frames, axis=1) + 1e-12).astype(np.float32) def _zcr_per_frame(frames: np.ndarray) -> np.ndarray: if frames.size == 0: return np.array([], dtype=np.float32) signs = np.sign(frames) signs[signs == 0] = 1 zc = np.mean(signs[:, 1:] != signs[:, :-1], axis=1).astype(np.float32) return zc def _pitch_autocorr(frame: np.ndarray, sr: int, fmin: float = 70.0, fmax: float = 350.0) -> float: """ Simple autocorrelation pitch estimate for one frame. Returns Hz or NaN. """ if frame.size == 0: return float("nan") frame = frame - np.mean(frame) energy = np.sqrt(np.mean(frame * frame) + 1e-12) if energy < 0.01: return float("nan") ac = correlate(frame, frame, mode="full") ac = ac[ac.size // 2 :] min_lag = int(sr / fmax) max_lag = int(sr / fmin) if max_lag <= min_lag + 2 or max_lag >= ac.size: return float("nan") seg = ac[min_lag:max_lag] if seg.size == 0: return float("nan") i = int(np.argmax(seg)) lag = min_lag + i if lag <= 0: return float("nan") return float(sr / lag) def compute_features(y: np.ndarray, sr: int) -> Tuple[Features, Dict[str, Any]]: if y is None or y.size == 0: f = Features( duration_s=float("nan"), rms_mean=float("nan"), rms_std=float("nan"), zcr_mean=float("nan"), pitch_median_hz=float("nan"), pitch_iqr_hz=float("nan"), voiced_ratio=float("nan"), n_pauses=0, pause_total_s=0.0, active_ratio=float("nan"), ) return f, {"y": np.array([]), "sr": sr, "hop": 160, "pauses": [], "pitch": np.array([]), "times": np.array([])} # resample + normalize if sr != TARGET_SR: y = _resample_linear(y.astype(np.float32), sr, TARGET_SR) sr = TARGET_SR else: y = y.astype(np.float32) mx = float(np.max(np.abs(y))) + 1e-9 y = y / mx duration = float(y.size / sr) hop = 160 # 10ms frame = 400 # 25ms frames = _frame_signal(y, frame=frame, hop=hop) rms = _rms_per_frame(frames) zcr = _zcr_per_frame(frames) rms_mean = float(np.mean(rms)) if rms.size else float("nan") rms_std = float(np.std(rms)) if rms.size else float("nan") zcr_mean = float(np.mean(zcr)) if zcr.size else float("nan") # pitch per frame (simple + explainable) pitch = np.array([_pitch_autocorr(frames[i], sr) for i in range(frames.shape[0])], dtype=np.float32) times = (np.arange(pitch.size) * hop / sr).astype(np.float32) voiced = np.isfinite(pitch) & (pitch > 0) voiced_ratio = float(np.mean(voiced)) if voiced.size else float("nan") if np.any(voiced): pv = pitch[voiced] pitch_median = float(np.median(pv)) q75, q25 = np.percentile(pv, [75, 25]) pitch_iqr = float(q75 - q25) else: pitch_median = float("nan") pitch_iqr = float("nan") # pause detection via RMS threshold if rms.size: thr = float(np.percentile(rms, 20)) * 0.8 silent = rms < thr min_pause_frames = int(0.2 / (hop / sr)) pauses = [] start = None for i, s in enumerate(silent): if s and start is None: start = i if (not s) and start is not None: end = i if (end - start) >= min_pause_frames: pauses.append((start, end)) start = None if start is not None: end = len(silent) if (end - start) >= min_pause_frames: pauses.append((start, end)) n_pauses = int(len(pauses)) pause_total_s = float(sum((e - s) * (hop / sr) for s, e in pauses)) active_ratio = float(1.0 - np.mean(silent)) else: pauses = [] n_pauses = 0 pause_total_s = 0.0 active_ratio = float("nan") feats = Features( duration_s=duration, rms_mean=rms_mean, rms_std=rms_std, zcr_mean=zcr_mean, pitch_median_hz=pitch_median, pitch_iqr_hz=pitch_iqr, voiced_ratio=voiced_ratio, n_pauses=n_pauses, pause_total_s=pause_total_s, active_ratio=active_ratio, ) artifacts = {"y": y, "sr": sr, "hop": hop, "pauses": pauses, "pitch": pitch, "times": times} return feats, artifacts # ========================================================= # Plotting # ========================================================= def plot_waveform_with_pauses(art: Dict[str, Any]) -> plt.Figure: y = art["y"] sr = art["sr"] hop = art["hop"] pauses = art.get("pauses", []) fig = plt.figure(figsize=(10, 3.2)) ax = fig.add_subplot(111) if y.size: t = np.arange(len(y)) / sr ax.plot(t, y, linewidth=0.8) for (s, e) in pauses: ax.axvspan(s * (hop / sr), e * (hop / sr), alpha=0.2) ax.set_title("Waveform (with detected pauses)") ax.set_xlabel("Time (s)") ax.set_ylabel("Amplitude") else: ax.text(0.5, 0.5, "No audio", ha="center", va="center") ax.set_axis_off() fig.tight_layout() return fig def plot_pitch(art: Dict[str, Any]) -> plt.Figure: pitch = art.get("pitch", np.array([])) times = art.get("times", np.array([])) fig = plt.figure(figsize=(10, 3.2)) ax = fig.add_subplot(111) if pitch.size and times.size: ax.plot(times, pitch, linewidth=1.0) ax.set_title("Pitch contour (simple autocorrelation)") ax.set_xlabel("Time (s)") ax.set_ylabel("Pitch (Hz)") else: ax.text(0.5, 0.5, "Pitch not available", ha="center", va="center") ax.set_axis_off() fig.tight_layout() return fig # ========================================================= # Explanations + summary # ========================================================= def features_table(feats: Features) -> List[List[str]]: def f3(x): return "—" if (x is None or not math.isfinite(x)) else f"{float(x):.3f}" return [ ["Duration", human_seconds(feats.duration_s)], ["RMS mean", f3(feats.rms_mean)], ["RMS std", f3(feats.rms_std)], ["ZCR mean", f3(feats.zcr_mean)], ["Median pitch", "—" if not math.isfinite(feats.pitch_median_hz) else f"{feats.pitch_median_hz:.1f} Hz"], ["Pitch IQR", "—" if not math.isfinite(feats.pitch_iqr_hz) else f"{feats.pitch_iqr_hz:.1f} Hz"], ["Voiced ratio", safe_pct(feats.voiced_ratio)], ["# pauses (≥0.2s)", str(int(feats.n_pauses))], ["Total pause time", human_seconds(feats.pause_total_s)], ["Active speech ratio", safe_pct(feats.active_ratio)], ] def summary_of_changes(first: Features, last: Features) -> str: def fmt(x, unit=""): if not _finite(x): return "—" return f"{x:+.3f}{unit}" d_pause_total = (last.pause_total_s - first.pause_total_s) if (_finite(last.pause_total_s) and _finite(first.pause_total_s)) else float("nan") d_n_pauses = (last.n_pauses - first.n_pauses) d_pitch = (last.pitch_median_hz - first.pitch_median_hz) if (_finite(last.pitch_median_hz) and _finite(first.pitch_median_hz)) else float("nan") d_rms = (last.rms_mean - first.rms_mean) if (_finite(last.rms_mean) and _finite(first.rms_mean)) else float("nan") d_active = (last.active_ratio - first.active_ratio) if (_finite(last.active_ratio) and _finite(first.active_ratio)) else float("nan") lines = [] lines.append("### Summary of changes (last vs first)") lines.append("This compares the **first** and **last** recording in your selection (upload order).") lines.append("") lines.append("**Measured differences (Δ = last − first):**") lines.append(f"- Total pause time: **{fmt(d_pause_total, ' s')}**") lines.append(f"- Number of pauses: **{d_n_pauses:+d}**") lines.append(f"- Median pitch: **{fmt(d_pitch, ' Hz')}**") lines.append(f"- RMS energy: **{fmt(d_rms)}**") lines.append(f"- Active speech ratio: **{fmt(d_active * 100.0, ' %')}**") lines.append("") lines.append("**How to interpret (non-clinical):**") lines.append("- More pauses / lower active ratio can reflect hesitations, slower speech, fatigue, or different environment/microphone setup.") lines.append("- Pitch changes can reflect speaking style, prosody, emotion, or recording conditions.") lines.append("- Energy changes often reflect distance to microphone / loudness / background noise.") lines.append("") lines.append("**Important:** not a diagnosis. These are explainable signal-level comparisons.") return "\n".join(lines) # ========================================================= # Callbacks # ========================================================= def analyze_one(audio_path: Optional[str]): if not audio_path: return [], None, None, "### Upload or record audio to start." y, sr = load_audio_file(audio_path) feats, art = compute_features(y, sr) return features_table(feats), plot_waveform_with_pauses(art), plot_pitch(art), "### This shows measurable signals (no diagnosis)." def analyze_many_paths(paths: List[str]): if not paths or len(paths) < 2: return ( [[1, "—", "Upload/select at least 2 recordings.", "", "", "", "", ""]], None, "### Select at least 2 recordings to see a trend.", "### Summary will appear here." ) rows = [] pause_series, pitch_series, rms_series = [], [], [] feats_first = None feats_last = None for idx, path in enumerate(paths, start=1): name = os.path.basename(path) y, sr = load_audio_file(path) feats, _ = compute_features(y, sr) if idx == 1: feats_first = feats feats_last = feats pause_s = feats.pause_total_s if math.isfinite(feats.pause_total_s) else np.nan pitch_hz = feats.pitch_median_hz if math.isfinite(feats.pitch_median_hz) else np.nan rms_m = feats.rms_mean if math.isfinite(feats.rms_mean) else np.nan active = feats.active_ratio if math.isfinite(feats.active_ratio) else np.nan rows.append([ idx, name, human_seconds(feats.duration_s), int(feats.n_pauses), "" if not math.isfinite(pause_s) else f"{pause_s:.3f}", "" if not math.isfinite(pitch_hz) else f"{pitch_hz:.1f}", "" if not math.isfinite(rms_m) else f"{rms_m:.3f}", "" if not math.isfinite(active) else f"{active*100:.1f}%", ]) pause_series.append(pause_s) pitch_series.append(pitch_hz) rms_series.append(rms_m) fig = plt.figure(figsize=(10, 3.4)) ax = fig.add_subplot(111) x = np.arange(1, len(rows) + 1) ax.plot(x, pause_series, marker="o", linewidth=1.2, label="Total pause time (s)") ax.plot(x, pitch_series, marker="o", linewidth=1.2, label="Median pitch (Hz)") ax.plot(x, rms_series, marker="o", linewidth=1.2, label="RMS mean") ax.set_title("Trend across recordings (same person)") ax.set_xlabel("Recording # (order)") ax.set_ylabel("Value (different scales)") ax.legend(loc="best") fig.tight_layout() summary = "### Summary not available." if feats_first is not None and feats_last is not None: summary = summary_of_changes(feats_first, feats_last) return rows, fig, "### Trend over time (within-person).", summary def analyze_many_uploaded(files): paths = [] if files: for f in files: paths.append(getattr(f, "name", None) or str(f)) return analyze_many_paths(paths) def analyze_many_bundled(selected_filenames: List[str]): paths = [] if selected_filenames: paths = [os.path.join(APP_DIR, fn) for fn in selected_filenames] return analyze_many_paths(paths) def refresh_bundled(): bundled = list_bundled_audio() return gr.update(choices=bundled, value=[]), diagnostics_text() # ========================================================= # UI # ========================================================= CSS = """ :root{ --bg: #0b0f19; --shadow: 0 12px 30px rgba(0,0,0,0.35); } .gradio-container{ background: radial-gradient(1200px 700px at 10% 10%, rgba(124,58,237,0.25), transparent 55%), radial-gradient(900px 600px at 90% 20%, rgba(34,197,94,0.18), transparent 55%), radial-gradient(1100px 800px at 40% 100%, rgba(59,130,246,0.15), transparent 60%), var(--bg) !important; } #header{ background: rgba(255,255,255,0.92) !important; color: #0b0f19 !important; border: 1px solid rgba(0,0,0,0.08); border-radius: 18px; padding: 18px 18px 14px 18px; box-shadow: var(--shadow); } #header *{ color: #0b0f19 !important; } .card{ background: rgba(255,255,255,0.92) !important; color: #0b0f19 !important; border: 1px solid rgba(0,0,0,0.08); border-radius: 18px; padding: 14px; box-shadow: var(--shadow); } .card *{ color: #0b0f19 !important; } /* Tabs readable on dark background */ div[role="tablist"]{ background: rgba(255,255,255,0.06) !important; border: 1px solid rgba(255,255,255,0.14) !important; border-radius: 14px !important; padding: 6px !important; } button[role="tab"]{ color: rgba(255,255,255,0.92) !important; } button[role="tab"][aria-selected="true"]{ color: rgba(255,255,255,0.98) !important; border-bottom: 2px solid rgba(255,255,255,0.65) !important; } """ def build_ui(): bundled0 = list_bundled_audio() with gr.Blocks( css=CSS, theme=gr.themes.Soft(primary_hue="violet", secondary_hue="emerald"), title="Explainable Speech Analytics (Demo)", ) as demo: gr.HTML( """ """ ) with gr.Tabs(): with gr.TabItem("Single"): with gr.Row(): with gr.Column(scale=5): audio = gr.Audio(label="Audio", sources=["upload", "microphone"], type="filepath") run = gr.Button("Analyze", variant="primary") with gr.Column(scale=7): feats_df = gr.Dataframe(headers=["Feature", "Value"], interactive=False, wrap=True) wf_plot = gr.Plot(label="Waveform + pauses") pitch_plot = gr.Plot(label="Pitch") expl = gr.Markdown("### Upload or record audio to start.", elem_classes=["card"]) run.click(analyze_one, inputs=[audio], outputs=[feats_df, wf_plot, pitch_plot, expl]) with gr.TabItem("Timeline"): with gr.Row(): with gr.Column(scale=5): gr.Markdown("#### Option A — Upload") files = gr.Files(label="Upload multiple audio files", file_count="multiple", file_types=["audio"]) run_up = gr.Button("Analyze uploaded timeline", variant="primary") gr.Markdown("#### Option B — Bundled samples (repo root)") bundled_select = gr.CheckboxGroup(choices=bundled0, label="Bundled audio files") with gr.Row(): refresh_btn = gr.Button("Refresh list", variant="secondary") run_b = gr.Button("Analyze selected bundled", variant="secondary") with gr.Column(scale=7): timeline_df = gr.Dataframe( headers=["#", "File", "Duration", "Pauses", "Pause(s)", "Pitch(Hz)", "RMS", "Active %"], interactive=False, wrap=True, ) timeline_plot = gr.Plot(label="Trend plot") timeline_expl = gr.Markdown("### Select at least 2 recordings.", elem_classes=["card"]) timeline_summary = gr.Markdown("### Summary will appear here.", elem_classes=["card"]) run_up.click(analyze_many_uploaded, inputs=[files], outputs=[timeline_df, timeline_plot, timeline_expl, timeline_summary]) run_b.click(analyze_many_bundled, inputs=[bundled_select], outputs=[timeline_df, timeline_plot, timeline_expl, timeline_summary]) with gr.TabItem("Diagnostics"): diag = gr.Markdown(diagnostics_text(), elem_classes=["card"]) diag_refresh = gr.Button("Refresh diagnostics", variant="secondary") diag_refresh.click(lambda: diagnostics_text(), inputs=None, outputs=[diag]) refresh_btn.click(refresh_bundled, inputs=None, outputs=[bundled_select, diag]) return demo if __name__ == "__main__": demo = build_ui() demo.queue(max_size=32) port = int(os.environ.get("PORT", os.environ.get("GRADIO_SERVER_PORT", "7860"))) demo.launch(server_name="0.0.0.0", server_port=port)