| |
| from pathlib import Path |
| import numpy as np, pandas as pd |
| import soundfile as sf |
| import librosa, time, io, os, json, re |
|
|
| |
| _WHISPER = None |
| def get_whisper(): |
| global _WHISPER |
| if _WHISPER is None: |
| from faster_whisper import WhisperModel |
| device = "cpu" |
| compute_type = "int8" |
| _WHISPER = WhisperModel("small", device=device, compute_type=compute_type) |
| return _WHISPER |
|
|
| |
| def to_mono_16k(wave, sr): |
| if isinstance(wave, np.ndarray) and wave.ndim > 1: |
| wave = np.mean(wave, axis=1) |
| if sr != 16000: |
| wave = librosa.resample(wave.astype(np.float32), orig_sr=sr, target_sr=16000) |
| sr = 16000 |
| wave = wave.astype(np.float32) |
| peak = np.max(np.abs(wave)) + 1e-9 |
| wave = wave / peak * 0.95 |
| return wave, sr |
|
|
| |
| def vad_segments_librosa(y, sr, hop_ms=20, frame_ms=50, thresh_db=-40.0): |
| y_fast = y[::2]; sr_fast = sr // 2 |
| hop_length = int(sr_fast * hop_ms/1000.0) |
| frame_length = int(sr_fast * frame_ms/1000.0) |
| rms = librosa.feature.rms(y=y_fast, frame_length=frame_length, hop_length=hop_length, center=True)[0] |
| rms_db = 20.0*np.log10(rms + 1e-12) |
| active = rms_db > thresh_db |
| segs, start = [], None |
| for i, a in enumerate(active): |
| t = i * hop_length / sr_fast |
| if a and start is None: start = t |
| if (not a) and start is not None: |
| if t - start >= 0.15: segs.append((start, t)) |
| start = None |
| if start is not None: |
| t = len(active) * hop_length / sr_fast |
| if t - start >= 0.15: segs.append((start, t)) |
| merged=[] |
| if segs: |
| cs, ce = segs[0] |
| for s,e in segs[1:]: |
| if s - ce < 0.05: ce = e |
| else: merged.append((cs,ce)); cs,ce = s,e |
| merged.append((cs,ce)) |
| return merged |
|
|
| |
| def vad_segments_webrtc(y, sr, aggressiveness=2): |
| import webrtcvad |
| vad = webrtcvad.Vad(aggressiveness) |
| frame_ms = 30 |
| frame_len = int(sr * frame_ms / 1000) |
| y16 = np.clip(y * 32768.0, -32768, 32767).astype(np.int16) |
| num_frames = len(y16) // frame_len |
| flags = [] |
| for i in range(num_frames): |
| chunk = y16[i*frame_len:(i+1)*frame_len].tobytes() |
| flags.append(vad.is_speech(chunk, sample_rate=sr)) |
| segs, start = [], None |
| hop_s = frame_ms/1000.0 |
| for i, f in enumerate(flags): |
| if f and start is None: start = i*hop_s |
| if (not f) and start is not None: |
| end = i*hop_s |
| if end-start >= 0.15: segs.append((start,end)) |
| start = None |
| if start is not None: |
| end = len(flags)*hop_s |
| if end-start >= 0.15: segs.append((start,end)) |
| merged=[] |
| if segs: |
| cs, ce = segs[0] |
| for s,e in segs[1:]: |
| if s - ce < 0.05: ce = e |
| else: merged.append((cs,ce)); cs,ce = s,e |
| merged.append((cs,ce)) |
| return merged |
|
|
| |
| def compute_rms_db(y, sr, frame_len, hop_len): |
| rms = librosa.feature.rms(y=y, frame_length=frame_len, hop_length=hop_len, center=True)[0] |
| rms_db = 20.0*np.log10(rms + 1e-12) |
| t = np.arange(len(rms_db)) * hop_len / sr |
| return t, rms_db |
|
|
| def compute_f0_yin(y, sr, frame_length=1024, hop_length=320, fmin=50.0, fmax=600.0): |
| f0 = librosa.yin(y, fmin=fmin, fmax=fmax, sr=sr, frame_length=frame_length, hop_length=hop_length) |
| t = np.arange(len(f0)) * hop_length / sr |
| return t, f0 |
|
|
| def local_mean(series_t, series_y, t0, t1): |
| if t1 <= t0: return np.nan |
| mask = (series_t >= t0) & (series_t <= t1) |
| if not np.any(mask): return np.nan |
| vals = series_y[mask] |
| return float(np.nanmean(vals)) |
|
|
| |
| _VOWELS = re.compile(r"[aeiouyаеёиоуыэюяAEIOUYАЕЁИОУЫЭЮЯ]+") |
| def estimate_syllables(word: str) -> int: |
| hits = _VOWELS.findall(word or "") |
| return max(1, len(hits)) |
|
|
| |
| def analyze(audio_bytes: bytes, filename_hint: str = "audio.wav"): |
| t0 = time.time() |
| data, sr = sf.read(io.BytesIO(audio_bytes)) |
| y = data.astype(np.float32) if (isinstance(data, np.ndarray) and data.ndim==1) else data.astype(np.float32) |
| y, sr = to_mono_16k(y, sr) |
|
|
| try: |
| segments = vad_segments_webrtc(y, sr); method_used = "webrtcvad" |
| except Exception: |
| segments = vad_segments_librosa(y, sr); method_used = "rms-threshold" |
|
|
| duration = len(y)/sr |
| if not segments: |
| segments = [(0.0, duration)] |
| method_used += " (fallback-full)" |
|
|
| asr = get_whisper() |
| words_raw = [] |
| for seg_id, (s,e) in enumerate(segments): |
| seg = y[int(s*sr):int(e*sr)] |
| if len(seg) == 0: continue |
| segments_gen, _info = asr.transcribe(seg, language="ru", word_timestamps=True) |
| for segm in segments_gen: |
| for w in segm.words: |
| words_raw.append({ |
| "segment_id": seg_id, |
| "word": (w.word or "").strip(), |
| "t_start": float(s + (w.start or 0.0)), |
| "t_end": float(s + (w.end or 0.0)), |
| "prob": float(getattr(w, "probability", np.nan)) |
| }) |
|
|
| base = Path(filename_hint).stem or "audio" |
| out_json = Path(f"{base}_analysis.json") |
|
|
| if not words_raw: |
| payload = {"version": "1.2", "aggregates": {}, "words": [], "transcript": ""} |
| out_json.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") |
| return str(out_json), {"duration_sec":0,"segments":0,"words":0} |
|
|
| hop_len = int(0.020*sr) |
| frame_len = int(0.050*sr) |
| t_rms, rms_db = compute_rms_db(y, sr, frame_len=frame_len, hop_len=hop_len) |
| t_f0, f0 = compute_f0_yin(y, sr, frame_length=1024, hop_length=hop_len, fmin=50.0, fmax=600.0) |
| voiced_mask = ~np.isnan(f0) |
|
|
| |
| jitter_voice = float(np.nanmean(np.abs(np.diff(f0))) / np.nanmean(f0)) if np.nanmean(f0)>0 else None |
| shimmer_voice = float(np.nanmean(np.abs(np.diff(rms_db))) / (abs(np.nanmean(rms_db)) + 1e-9)) if np.nanmean(rms_db)!=0 else None |
|
|
| |
| rows = [] |
| prev_end = {} |
| for w in words_raw: |
| seg_id = w["segment_id"]; t0w, t1w = w["t_start"], w["t_end"] |
| if t1w < t0w: t1w = t0w |
| pre_pause = max(0.0, t0w - prev_end.get(seg_id, 0.0)) |
| prev_end[seg_id] = t1w |
|
|
| f0_mean_local = local_mean(t_f0, f0, t0w, t1w) |
| rms_mean_local = local_mean(t_rms, rms_db, t0w, t1w) |
| dur = max(0.0, t1w - t0w) |
|
|
| syl = estimate_syllables(w["word"]) |
| sylps = (syl / dur) if dur > 1e-6 else np.nan |
|
|
| rows.append({ |
| "start_s": round(t0w,3), |
| "end_s": round(t1w,3), |
| "duration_s": round(dur,3), |
| "pre_pause_ms": int(round(pre_pause*1000)), |
| "word": w["word"], |
| "prob": round(w["prob"],4) if not np.isnan(w["prob"]) else None, |
| "f0_mean_hz": round(f0_mean_local,2) if not np.isnan(f0_mean_local) else None, |
| "rms_dbfs": round(rms_mean_local,2) if not np.isnan(rms_mean_local) else None, |
| "speed_local_sylps": round(sylps,3) if not np.isnan(sylps) else None |
| }) |
|
|
| df = pd.DataFrame(rows) |
|
|
| def zscore(col): |
| x = df[col].astype(float) |
| mu = np.nanmean(x); sd = np.nanstd(x) + 1e-9 |
| return (x - mu)/sd |
|
|
| df["f0_z"] = zscore("f0_mean_hz") |
| df["rms_z"] = zscore("rms_dbfs") |
| df["duration_z"] = zscore("duration_s") |
|
|
| acc_raw = 0.6*df["rms_z"].fillna(0.0) + 0.4*df["f0_z"].abs().fillna(0.0) |
| df["accent_score"] = 1.0/(1.0 + np.exp(-acc_raw)) |
| df["accent_flag"] = (df["accent_score"] >= 0.6).astype(int) |
|
|
| words_count = int(df.shape[0]) |
| speech_rate_wps = float(words_count / duration) if duration > 0 else 0.0 |
|
|
| f0_vals = df["f0_mean_hz"].astype(float).dropna() |
| f0_mean = float(np.mean(f0_vals)) if not f0_vals.empty else None |
| f0_med = float(np.median(f0_vals)) if not f0_vals.empty else None |
| f0_std = float(np.std(f0_vals)) if not f0_vals.empty else None |
| f0_stability = float(max(0.0, 1.0 - (f0_std / (f0_mean + 1e-9)))) if f0_mean else None |
|
|
| rms_dbfs_mean = float(np.nanmean(df["rms_dbfs"])) if df["rms_dbfs"].notna().any() else None |
| pitch_conf_mean = 1.0 |
|
|
| |
| df = df.replace({np.nan: None}) |
|
|
| |
| transcript = " ".join(df["word"].tolist()).replace(" ,", ",").replace(" .", ".") |
| payload = { |
| "version": "1.2", |
| "aggregates": { |
| "duration_sec": round(duration,2), |
| "voiced_duration_sec": round(float((~np.isnan(f0)).sum()) * (hop_len / sr), 2), |
| "voiced_ratio": float(np.clip(((~np.isnan(f0)).sum() * hop_len / sr) / duration if duration>0 else 0.0, 0, 1)), |
| "rms_dbfs_mean": rms_dbfs_mean, |
| "f0_mean_hz": f0_mean, |
| "f0_median_hz": f0_med, |
| "f0_std_hz": f0_std, |
| "f0_stability": f0_stability, |
| "pitch_confidence_mean": pitch_conf_mean, |
| "jitter_voice": jitter_voice, |
| "shimmer_voice": shimmer_voice, |
| "words_count": words_count, |
| "speech_rate_wps": round(speech_rate_wps,2), |
| "vad": method_used |
| }, |
| "words": df.to_dict(orient="records"), |
| "transcript": transcript |
| } |
|
|
| out_json.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") |
|
|
| info = { |
| "duration_sec": round(duration,2), |
| "segments": len(segments), |
| "words": words_count, |
| "vad": method_used, |
| "elapsed_sec": round(time.time()-t0,2) |
| } |
| return str(out_json), info |
|
|