Marcel0123's picture
Update app.py
d1034a7 verified
import os
import math
import tempfile
import numpy as np
import gradio as gr
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Dict, Any, Tuple, List, Optional
import soundfile as sf
from pydub import AudioSegment
from scipy.signal import correlate
# =========================================================
# Config
# =========================================================
TARGET_SR = 16000
APP_DIR = os.path.dirname(os.path.abspath(__file__))
# =========================================================
# Helpers
# =========================================================
def human_seconds(sec: float) -> str:
if sec is None or not math.isfinite(sec):
return "—"
if sec < 60:
return f"{sec:.1f}s"
m = int(sec // 60)
return f"{m}m {sec - 60*m:.1f}s"
def safe_pct(x: float) -> str:
if x is None or not math.isfinite(x):
return "—"
return f"{x*100:.1f}%"
def list_bundled_audio() -> List[str]:
exts = (".mp3", ".wav", ".m4a", ".flac", ".ogg")
try:
items = os.listdir(APP_DIR)
except Exception:
return []
files = [fn for fn in items if fn.lower().endswith(exts)]
files.sort()
return files
def _resample_linear(y: np.ndarray, sr: int, target_sr: int) -> np.ndarray:
if sr == target_sr or y.size == 0:
return y
x_old = np.linspace(0.0, 1.0, num=y.size, endpoint=False)
new_len = int(round(y.size * (target_sr / sr)))
x_new = np.linspace(0.0, 1.0, num=max(new_len, 1), endpoint=False)
return np.interp(x_new, x_old, y).astype(np.float32)
def load_audio_file(path: str) -> Tuple[np.ndarray, int]:
"""
Robust loader:
- WAV/FLAC/OGG via soundfile
- MP3/M4A via pydub (ffmpeg)
Returns mono float32 waveform + sr.
"""
ext = os.path.splitext(path)[1].lower()
if ext in [".wav", ".flac", ".ogg"]:
y, sr = sf.read(path, always_2d=True)
y = y.mean(axis=1).astype(np.float32)
return y, int(sr)
# MP3/M4A/etc. via pydub
seg = AudioSegment.from_file(path)
seg = seg.set_channels(1)
sr = seg.frame_rate
samples = np.array(seg.get_array_of_samples())
# Convert to float32 in [-1, 1]
y = samples.astype(np.float32) / (2 ** (8 * seg.sample_width - 1))
return y, int(sr)
def diagnostics_text() -> str:
bundled = list_bundled_audio()
lines = []
lines.append(f"**APP_DIR:** `{APP_DIR}`")
lines.append(f"**CWD:** `{os.getcwd()}`")
lines.append(f"**Bundled audio found:** {len(bundled)}")
if bundled:
for fn in bundled:
full = os.path.join(APP_DIR, fn)
try:
size = os.path.getsize(full)
lines.append(f"- `{fn}` ({size} bytes)")
except Exception:
lines.append(f"- `{fn}` (size unknown)")
else:
lines.append("- *(none found next to app.py)*")
lines.append("")
lines.append("**If build hangs:** usually heavy deps (e.g. librosa/numba). This version avoids them.")
lines.append("**Microphone note:** may be blocked by browser permissions/corporate policy.")
return "\n".join(lines)
def _finite(x: float) -> bool:
return x is not None and isinstance(x, (int, float, np.floating)) and math.isfinite(float(x))
# =========================================================
# Feature extraction (no librosa)
# =========================================================
@dataclass
class Features:
duration_s: float
rms_mean: float
rms_std: float
zcr_mean: float
pitch_median_hz: float
pitch_iqr_hz: float
voiced_ratio: float
n_pauses: int
pause_total_s: float
active_ratio: float
def _frame_signal(y: np.ndarray, frame: int, hop: int) -> np.ndarray:
if y.size < frame:
return np.zeros((0, frame), dtype=np.float32)
n = 1 + (y.size - frame) // hop
idx = (np.arange(n)[:, None] * hop) + np.arange(frame)[None, :]
return y[idx]
def _rms_per_frame(frames: np.ndarray) -> np.ndarray:
if frames.size == 0:
return np.array([], dtype=np.float32)
return np.sqrt(np.mean(frames * frames, axis=1) + 1e-12).astype(np.float32)
def _zcr_per_frame(frames: np.ndarray) -> np.ndarray:
if frames.size == 0:
return np.array([], dtype=np.float32)
signs = np.sign(frames)
signs[signs == 0] = 1
zc = np.mean(signs[:, 1:] != signs[:, :-1], axis=1).astype(np.float32)
return zc
def _pitch_autocorr(frame: np.ndarray, sr: int, fmin: float = 70.0, fmax: float = 350.0) -> float:
"""
Simple autocorrelation pitch estimate for one frame.
Returns Hz or NaN.
"""
if frame.size == 0:
return float("nan")
frame = frame - np.mean(frame)
energy = np.sqrt(np.mean(frame * frame) + 1e-12)
if energy < 0.01:
return float("nan")
ac = correlate(frame, frame, mode="full")
ac = ac[ac.size // 2 :]
min_lag = int(sr / fmax)
max_lag = int(sr / fmin)
if max_lag <= min_lag + 2 or max_lag >= ac.size:
return float("nan")
seg = ac[min_lag:max_lag]
if seg.size == 0:
return float("nan")
i = int(np.argmax(seg))
lag = min_lag + i
if lag <= 0:
return float("nan")
return float(sr / lag)
def compute_features(y: np.ndarray, sr: int) -> Tuple[Features, Dict[str, Any]]:
if y is None or y.size == 0:
f = Features(
duration_s=float("nan"),
rms_mean=float("nan"),
rms_std=float("nan"),
zcr_mean=float("nan"),
pitch_median_hz=float("nan"),
pitch_iqr_hz=float("nan"),
voiced_ratio=float("nan"),
n_pauses=0,
pause_total_s=0.0,
active_ratio=float("nan"),
)
return f, {"y": np.array([]), "sr": sr, "hop": 160, "pauses": [], "pitch": np.array([]), "times": np.array([])}
# resample + normalize
if sr != TARGET_SR:
y = _resample_linear(y.astype(np.float32), sr, TARGET_SR)
sr = TARGET_SR
else:
y = y.astype(np.float32)
mx = float(np.max(np.abs(y))) + 1e-9
y = y / mx
duration = float(y.size / sr)
hop = 160 # 10ms
frame = 400 # 25ms
frames = _frame_signal(y, frame=frame, hop=hop)
rms = _rms_per_frame(frames)
zcr = _zcr_per_frame(frames)
rms_mean = float(np.mean(rms)) if rms.size else float("nan")
rms_std = float(np.std(rms)) if rms.size else float("nan")
zcr_mean = float(np.mean(zcr)) if zcr.size else float("nan")
# pitch per frame (simple + explainable)
pitch = np.array([_pitch_autocorr(frames[i], sr) for i in range(frames.shape[0])], dtype=np.float32)
times = (np.arange(pitch.size) * hop / sr).astype(np.float32)
voiced = np.isfinite(pitch) & (pitch > 0)
voiced_ratio = float(np.mean(voiced)) if voiced.size else float("nan")
if np.any(voiced):
pv = pitch[voiced]
pitch_median = float(np.median(pv))
q75, q25 = np.percentile(pv, [75, 25])
pitch_iqr = float(q75 - q25)
else:
pitch_median = float("nan")
pitch_iqr = float("nan")
# pause detection via RMS threshold
if rms.size:
thr = float(np.percentile(rms, 20)) * 0.8
silent = rms < thr
min_pause_frames = int(0.2 / (hop / sr))
pauses = []
start = None
for i, s in enumerate(silent):
if s and start is None:
start = i
if (not s) and start is not None:
end = i
if (end - start) >= min_pause_frames:
pauses.append((start, end))
start = None
if start is not None:
end = len(silent)
if (end - start) >= min_pause_frames:
pauses.append((start, end))
n_pauses = int(len(pauses))
pause_total_s = float(sum((e - s) * (hop / sr) for s, e in pauses))
active_ratio = float(1.0 - np.mean(silent))
else:
pauses = []
n_pauses = 0
pause_total_s = 0.0
active_ratio = float("nan")
feats = Features(
duration_s=duration,
rms_mean=rms_mean,
rms_std=rms_std,
zcr_mean=zcr_mean,
pitch_median_hz=pitch_median,
pitch_iqr_hz=pitch_iqr,
voiced_ratio=voiced_ratio,
n_pauses=n_pauses,
pause_total_s=pause_total_s,
active_ratio=active_ratio,
)
artifacts = {"y": y, "sr": sr, "hop": hop, "pauses": pauses, "pitch": pitch, "times": times}
return feats, artifacts
# =========================================================
# Plotting
# =========================================================
def plot_waveform_with_pauses(art: Dict[str, Any]) -> plt.Figure:
y = art["y"]
sr = art["sr"]
hop = art["hop"]
pauses = art.get("pauses", [])
fig = plt.figure(figsize=(10, 3.2))
ax = fig.add_subplot(111)
if y.size:
t = np.arange(len(y)) / sr
ax.plot(t, y, linewidth=0.8)
for (s, e) in pauses:
ax.axvspan(s * (hop / sr), e * (hop / sr), alpha=0.2)
ax.set_title("Waveform (with detected pauses)")
ax.set_xlabel("Time (s)")
ax.set_ylabel("Amplitude")
else:
ax.text(0.5, 0.5, "No audio", ha="center", va="center")
ax.set_axis_off()
fig.tight_layout()
return fig
def plot_pitch(art: Dict[str, Any]) -> plt.Figure:
pitch = art.get("pitch", np.array([]))
times = art.get("times", np.array([]))
fig = plt.figure(figsize=(10, 3.2))
ax = fig.add_subplot(111)
if pitch.size and times.size:
ax.plot(times, pitch, linewidth=1.0)
ax.set_title("Pitch contour (simple autocorrelation)")
ax.set_xlabel("Time (s)")
ax.set_ylabel("Pitch (Hz)")
else:
ax.text(0.5, 0.5, "Pitch not available", ha="center", va="center")
ax.set_axis_off()
fig.tight_layout()
return fig
# =========================================================
# Explanations + summary
# =========================================================
def features_table(feats: Features) -> List[List[str]]:
def f3(x):
return "—" if (x is None or not math.isfinite(x)) else f"{float(x):.3f}"
return [
["Duration", human_seconds(feats.duration_s)],
["RMS mean", f3(feats.rms_mean)],
["RMS std", f3(feats.rms_std)],
["ZCR mean", f3(feats.zcr_mean)],
["Median pitch", "—" if not math.isfinite(feats.pitch_median_hz) else f"{feats.pitch_median_hz:.1f} Hz"],
["Pitch IQR", "—" if not math.isfinite(feats.pitch_iqr_hz) else f"{feats.pitch_iqr_hz:.1f} Hz"],
["Voiced ratio", safe_pct(feats.voiced_ratio)],
["# pauses (≥0.2s)", str(int(feats.n_pauses))],
["Total pause time", human_seconds(feats.pause_total_s)],
["Active speech ratio", safe_pct(feats.active_ratio)],
]
def summary_of_changes(first: Features, last: Features) -> str:
def fmt(x, unit=""):
if not _finite(x):
return "—"
return f"{x:+.3f}{unit}"
d_pause_total = (last.pause_total_s - first.pause_total_s) if (_finite(last.pause_total_s) and _finite(first.pause_total_s)) else float("nan")
d_n_pauses = (last.n_pauses - first.n_pauses)
d_pitch = (last.pitch_median_hz - first.pitch_median_hz) if (_finite(last.pitch_median_hz) and _finite(first.pitch_median_hz)) else float("nan")
d_rms = (last.rms_mean - first.rms_mean) if (_finite(last.rms_mean) and _finite(first.rms_mean)) else float("nan")
d_active = (last.active_ratio - first.active_ratio) if (_finite(last.active_ratio) and _finite(first.active_ratio)) else float("nan")
lines = []
lines.append("### Summary of changes (last vs first)")
lines.append("This compares the **first** and **last** recording in your selection (upload order).")
lines.append("")
lines.append("**Measured differences (Δ = last − first):**")
lines.append(f"- Total pause time: **{fmt(d_pause_total, ' s')}**")
lines.append(f"- Number of pauses: **{d_n_pauses:+d}**")
lines.append(f"- Median pitch: **{fmt(d_pitch, ' Hz')}**")
lines.append(f"- RMS energy: **{fmt(d_rms)}**")
lines.append(f"- Active speech ratio: **{fmt(d_active * 100.0, ' %')}**")
lines.append("")
lines.append("**How to interpret (non-clinical):**")
lines.append("- More pauses / lower active ratio can reflect hesitations, slower speech, fatigue, or different environment/microphone setup.")
lines.append("- Pitch changes can reflect speaking style, prosody, emotion, or recording conditions.")
lines.append("- Energy changes often reflect distance to microphone / loudness / background noise.")
lines.append("")
lines.append("**Important:** not a diagnosis. These are explainable signal-level comparisons.")
return "\n".join(lines)
# =========================================================
# Callbacks
# =========================================================
def analyze_one(audio_path: Optional[str]):
if not audio_path:
return [], None, None, "### Upload or record audio to start."
y, sr = load_audio_file(audio_path)
feats, art = compute_features(y, sr)
return features_table(feats), plot_waveform_with_pauses(art), plot_pitch(art), "### This shows measurable signals (no diagnosis)."
def analyze_many_paths(paths: List[str]):
if not paths or len(paths) < 2:
return (
[[1, "—", "Upload/select at least 2 recordings.", "", "", "", "", ""]],
None,
"### Select at least 2 recordings to see a trend.",
"### Summary will appear here."
)
rows = []
pause_series, pitch_series, rms_series = [], [], []
feats_first = None
feats_last = None
for idx, path in enumerate(paths, start=1):
name = os.path.basename(path)
y, sr = load_audio_file(path)
feats, _ = compute_features(y, sr)
if idx == 1:
feats_first = feats
feats_last = feats
pause_s = feats.pause_total_s if math.isfinite(feats.pause_total_s) else np.nan
pitch_hz = feats.pitch_median_hz if math.isfinite(feats.pitch_median_hz) else np.nan
rms_m = feats.rms_mean if math.isfinite(feats.rms_mean) else np.nan
active = feats.active_ratio if math.isfinite(feats.active_ratio) else np.nan
rows.append([
idx,
name,
human_seconds(feats.duration_s),
int(feats.n_pauses),
"" if not math.isfinite(pause_s) else f"{pause_s:.3f}",
"" if not math.isfinite(pitch_hz) else f"{pitch_hz:.1f}",
"" if not math.isfinite(rms_m) else f"{rms_m:.3f}",
"" if not math.isfinite(active) else f"{active*100:.1f}%",
])
pause_series.append(pause_s)
pitch_series.append(pitch_hz)
rms_series.append(rms_m)
fig = plt.figure(figsize=(10, 3.4))
ax = fig.add_subplot(111)
x = np.arange(1, len(rows) + 1)
ax.plot(x, pause_series, marker="o", linewidth=1.2, label="Total pause time (s)")
ax.plot(x, pitch_series, marker="o", linewidth=1.2, label="Median pitch (Hz)")
ax.plot(x, rms_series, marker="o", linewidth=1.2, label="RMS mean")
ax.set_title("Trend across recordings (same person)")
ax.set_xlabel("Recording # (order)")
ax.set_ylabel("Value (different scales)")
ax.legend(loc="best")
fig.tight_layout()
summary = "### Summary not available."
if feats_first is not None and feats_last is not None:
summary = summary_of_changes(feats_first, feats_last)
return rows, fig, "### Trend over time (within-person).", summary
def analyze_many_uploaded(files):
paths = []
if files:
for f in files:
paths.append(getattr(f, "name", None) or str(f))
return analyze_many_paths(paths)
def analyze_many_bundled(selected_filenames: List[str]):
paths = []
if selected_filenames:
paths = [os.path.join(APP_DIR, fn) for fn in selected_filenames]
return analyze_many_paths(paths)
def refresh_bundled():
bundled = list_bundled_audio()
return gr.update(choices=bundled, value=[]), diagnostics_text()
# =========================================================
# UI
# =========================================================
CSS = """
:root{ --bg: #0b0f19; --shadow: 0 12px 30px rgba(0,0,0,0.35); }
.gradio-container{
background:
radial-gradient(1200px 700px at 10% 10%, rgba(124,58,237,0.25), transparent 55%),
radial-gradient(900px 600px at 90% 20%, rgba(34,197,94,0.18), transparent 55%),
radial-gradient(1100px 800px at 40% 100%, rgba(59,130,246,0.15), transparent 60%),
var(--bg) !important;
}
#header{
background: rgba(255,255,255,0.92) !important;
color: #0b0f19 !important;
border: 1px solid rgba(0,0,0,0.08);
border-radius: 18px;
padding: 18px 18px 14px 18px;
box-shadow: var(--shadow);
}
#header *{ color: #0b0f19 !important; }
.card{
background: rgba(255,255,255,0.92) !important;
color: #0b0f19 !important;
border: 1px solid rgba(0,0,0,0.08);
border-radius: 18px;
padding: 14px;
box-shadow: var(--shadow);
}
.card *{ color: #0b0f19 !important; }
/* Tabs readable on dark background */
div[role="tablist"]{
background: rgba(255,255,255,0.06) !important;
border: 1px solid rgba(255,255,255,0.14) !important;
border-radius: 14px !important;
padding: 6px !important;
}
button[role="tab"]{ color: rgba(255,255,255,0.92) !important; }
button[role="tab"][aria-selected="true"]{
color: rgba(255,255,255,0.98) !important;
border-bottom: 2px solid rgba(255,255,255,0.65) !important;
}
"""
def build_ui():
bundled0 = list_bundled_audio()
with gr.Blocks(
css=CSS,
theme=gr.themes.Soft(primary_hue="violet", secondary_hue="emerald"),
title="Explainable Speech Analytics (Demo)",
) as demo:
gr.HTML(
"""
<div id="header">
<h2 style="margin:0">Explainable Speech Analytics</h2>
<p style="margin-top:8px; margin-bottom:0">
Demo: measurable speech signals (pauses, pitch, energy). No diagnosis.
</p>
</div>
"""
)
with gr.Tabs():
with gr.TabItem("Single"):
with gr.Row():
with gr.Column(scale=5):
audio = gr.Audio(label="Audio", sources=["upload", "microphone"], type="filepath")
run = gr.Button("Analyze", variant="primary")
with gr.Column(scale=7):
feats_df = gr.Dataframe(headers=["Feature", "Value"], interactive=False, wrap=True)
wf_plot = gr.Plot(label="Waveform + pauses")
pitch_plot = gr.Plot(label="Pitch")
expl = gr.Markdown("### Upload or record audio to start.", elem_classes=["card"])
run.click(analyze_one, inputs=[audio], outputs=[feats_df, wf_plot, pitch_plot, expl])
with gr.TabItem("Timeline"):
with gr.Row():
with gr.Column(scale=5):
gr.Markdown("#### Option A — Upload")
files = gr.Files(label="Upload multiple audio files", file_count="multiple", file_types=["audio"])
run_up = gr.Button("Analyze uploaded timeline", variant="primary")
gr.Markdown("#### Option B — Bundled samples (repo root)")
bundled_select = gr.CheckboxGroup(choices=bundled0, label="Bundled audio files")
with gr.Row():
refresh_btn = gr.Button("Refresh list", variant="secondary")
run_b = gr.Button("Analyze selected bundled", variant="secondary")
with gr.Column(scale=7):
timeline_df = gr.Dataframe(
headers=["#", "File", "Duration", "Pauses", "Pause(s)", "Pitch(Hz)", "RMS", "Active %"],
interactive=False,
wrap=True,
)
timeline_plot = gr.Plot(label="Trend plot")
timeline_expl = gr.Markdown("### Select at least 2 recordings.", elem_classes=["card"])
timeline_summary = gr.Markdown("### Summary will appear here.", elem_classes=["card"])
run_up.click(analyze_many_uploaded, inputs=[files], outputs=[timeline_df, timeline_plot, timeline_expl, timeline_summary])
run_b.click(analyze_many_bundled, inputs=[bundled_select], outputs=[timeline_df, timeline_plot, timeline_expl, timeline_summary])
with gr.TabItem("Diagnostics"):
diag = gr.Markdown(diagnostics_text(), elem_classes=["card"])
diag_refresh = gr.Button("Refresh diagnostics", variant="secondary")
diag_refresh.click(lambda: diagnostics_text(), inputs=None, outputs=[diag])
refresh_btn.click(refresh_bundled, inputs=None, outputs=[bundled_select, diag])
return demo
if __name__ == "__main__":
demo = build_ui()
demo.queue(max_size=32)
port = int(os.environ.get("PORT", os.environ.get("GRADIO_SERVER_PORT", "7860")))
demo.launch(server_name="0.0.0.0", server_port=port)