AIAngel2 / analyzer.py
airosss's picture
Update analyzer.py
6df653b verified
# analyzer.py (v1.2.1 — автоочистка NaN перед JSON)
from pathlib import Path
import numpy as np, pandas as pd
import soundfile as sf
import librosa, time, io, os, json, re
# ====== Whisper кэш ======
_WHISPER = None
def get_whisper():
global _WHISPER
if _WHISPER is None:
from faster_whisper import WhisperModel
device = "cpu"
compute_type = "int8"
_WHISPER = WhisperModel("small", device=device, compute_type=compute_type)
return _WHISPER
# ====== Подготовка аудио ======
def to_mono_16k(wave, sr):
if isinstance(wave, np.ndarray) and wave.ndim > 1:
wave = np.mean(wave, axis=1)
if sr != 16000:
wave = librosa.resample(wave.astype(np.float32), orig_sr=sr, target_sr=16000)
sr = 16000
wave = wave.astype(np.float32)
peak = np.max(np.abs(wave)) + 1e-9
wave = wave / peak * 0.95
return wave, sr
# ====== Простейший VAD ======
def vad_segments_librosa(y, sr, hop_ms=20, frame_ms=50, thresh_db=-40.0):
y_fast = y[::2]; sr_fast = sr // 2
hop_length = int(sr_fast * hop_ms/1000.0)
frame_length = int(sr_fast * frame_ms/1000.0)
rms = librosa.feature.rms(y=y_fast, frame_length=frame_length, hop_length=hop_length, center=True)[0]
rms_db = 20.0*np.log10(rms + 1e-12)
active = rms_db > thresh_db
segs, start = [], None
for i, a in enumerate(active):
t = i * hop_length / sr_fast
if a and start is None: start = t
if (not a) and start is not None:
if t - start >= 0.15: segs.append((start, t))
start = None
if start is not None:
t = len(active) * hop_length / sr_fast
if t - start >= 0.15: segs.append((start, t))
merged=[]
if segs:
cs, ce = segs[0]
for s,e in segs[1:]:
if s - ce < 0.05: ce = e
else: merged.append((cs,ce)); cs,ce = s,e
merged.append((cs,ce))
return merged
# ====== webrtcvad ======
def vad_segments_webrtc(y, sr, aggressiveness=2):
import webrtcvad
vad = webrtcvad.Vad(aggressiveness)
frame_ms = 30
frame_len = int(sr * frame_ms / 1000)
y16 = np.clip(y * 32768.0, -32768, 32767).astype(np.int16)
num_frames = len(y16) // frame_len
flags = []
for i in range(num_frames):
chunk = y16[i*frame_len:(i+1)*frame_len].tobytes()
flags.append(vad.is_speech(chunk, sample_rate=sr))
segs, start = [], None
hop_s = frame_ms/1000.0
for i, f in enumerate(flags):
if f and start is None: start = i*hop_s
if (not f) and start is not None:
end = i*hop_s
if end-start >= 0.15: segs.append((start,end))
start = None
if start is not None:
end = len(flags)*hop_s
if end-start >= 0.15: segs.append((start,end))
merged=[]
if segs:
cs, ce = segs[0]
for s,e in segs[1:]:
if s - ce < 0.05: ce = e
else: merged.append((cs,ce)); cs,ce = s,e
merged.append((cs,ce))
return merged
# ====== RMS и F0 ======
def compute_rms_db(y, sr, frame_len, hop_len):
rms = librosa.feature.rms(y=y, frame_length=frame_len, hop_length=hop_len, center=True)[0]
rms_db = 20.0*np.log10(rms + 1e-12)
t = np.arange(len(rms_db)) * hop_len / sr
return t, rms_db
def compute_f0_yin(y, sr, frame_length=1024, hop_length=320, fmin=50.0, fmax=600.0):
f0 = librosa.yin(y, fmin=fmin, fmax=fmax, sr=sr, frame_length=frame_length, hop_length=hop_length)
t = np.arange(len(f0)) * hop_length / sr
return t, f0
def local_mean(series_t, series_y, t0, t1):
if t1 <= t0: return np.nan
mask = (series_t >= t0) & (series_t <= t1)
if not np.any(mask): return np.nan
vals = series_y[mask]
return float(np.nanmean(vals))
# ====== слоги для скорости ======
_VOWELS = re.compile(r"[aeiouyаеёиоуыэюяAEIOUYАЕЁИОУЫЭЮЯ]+")
def estimate_syllables(word: str) -> int:
hits = _VOWELS.findall(word or "")
return max(1, len(hits))
# ====== ГЛАВНАЯ ======
def analyze(audio_bytes: bytes, filename_hint: str = "audio.wav"):
t0 = time.time()
data, sr = sf.read(io.BytesIO(audio_bytes))
y = data.astype(np.float32) if (isinstance(data, np.ndarray) and data.ndim==1) else data.astype(np.float32)
y, sr = to_mono_16k(y, sr)
try:
segments = vad_segments_webrtc(y, sr); method_used = "webrtcvad"
except Exception:
segments = vad_segments_librosa(y, sr); method_used = "rms-threshold"
duration = len(y)/sr
if not segments:
segments = [(0.0, duration)]
method_used += " (fallback-full)"
asr = get_whisper()
words_raw = []
for seg_id, (s,e) in enumerate(segments):
seg = y[int(s*sr):int(e*sr)]
if len(seg) == 0: continue
segments_gen, _info = asr.transcribe(seg, language="ru", word_timestamps=True)
for segm in segments_gen:
for w in segm.words:
words_raw.append({
"segment_id": seg_id,
"word": (w.word or "").strip(),
"t_start": float(s + (w.start or 0.0)),
"t_end": float(s + (w.end or 0.0)),
"prob": float(getattr(w, "probability", np.nan))
})
base = Path(filename_hint).stem or "audio"
out_json = Path(f"{base}_analysis.json")
if not words_raw:
payload = {"version": "1.2", "aggregates": {}, "words": [], "transcript": ""}
out_json.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
return str(out_json), {"duration_sec":0,"segments":0,"words":0}
hop_len = int(0.020*sr)
frame_len = int(0.050*sr)
t_rms, rms_db = compute_rms_db(y, sr, frame_len=frame_len, hop_len=hop_len)
t_f0, f0 = compute_f0_yin(y, sr, frame_length=1024, hop_length=hop_len, fmin=50.0, fmax=600.0)
voiced_mask = ~np.isnan(f0)
# ==== jitter и shimmer ====
jitter_voice = float(np.nanmean(np.abs(np.diff(f0))) / np.nanmean(f0)) if np.nanmean(f0)>0 else None
shimmer_voice = float(np.nanmean(np.abs(np.diff(rms_db))) / (abs(np.nanmean(rms_db)) + 1e-9)) if np.nanmean(rms_db)!=0 else None
# ==== таблица по словам ====
rows = []
prev_end = {}
for w in words_raw:
seg_id = w["segment_id"]; t0w, t1w = w["t_start"], w["t_end"]
if t1w < t0w: t1w = t0w
pre_pause = max(0.0, t0w - prev_end.get(seg_id, 0.0))
prev_end[seg_id] = t1w
f0_mean_local = local_mean(t_f0, f0, t0w, t1w)
rms_mean_local = local_mean(t_rms, rms_db, t0w, t1w)
dur = max(0.0, t1w - t0w)
syl = estimate_syllables(w["word"])
sylps = (syl / dur) if dur > 1e-6 else np.nan
rows.append({
"start_s": round(t0w,3),
"end_s": round(t1w,3),
"duration_s": round(dur,3),
"pre_pause_ms": int(round(pre_pause*1000)),
"word": w["word"],
"prob": round(w["prob"],4) if not np.isnan(w["prob"]) else None,
"f0_mean_hz": round(f0_mean_local,2) if not np.isnan(f0_mean_local) else None,
"rms_dbfs": round(rms_mean_local,2) if not np.isnan(rms_mean_local) else None,
"speed_local_sylps": round(sylps,3) if not np.isnan(sylps) else None
})
df = pd.DataFrame(rows)
def zscore(col):
x = df[col].astype(float)
mu = np.nanmean(x); sd = np.nanstd(x) + 1e-9
return (x - mu)/sd
df["f0_z"] = zscore("f0_mean_hz")
df["rms_z"] = zscore("rms_dbfs")
df["duration_z"] = zscore("duration_s")
acc_raw = 0.6*df["rms_z"].fillna(0.0) + 0.4*df["f0_z"].abs().fillna(0.0)
df["accent_score"] = 1.0/(1.0 + np.exp(-acc_raw))
df["accent_flag"] = (df["accent_score"] >= 0.6).astype(int)
words_count = int(df.shape[0])
speech_rate_wps = float(words_count / duration) if duration > 0 else 0.0
f0_vals = df["f0_mean_hz"].astype(float).dropna()
f0_mean = float(np.mean(f0_vals)) if not f0_vals.empty else None
f0_med = float(np.median(f0_vals)) if not f0_vals.empty else None
f0_std = float(np.std(f0_vals)) if not f0_vals.empty else None
f0_stability = float(max(0.0, 1.0 - (f0_std / (f0_mean + 1e-9)))) if f0_mean else None
rms_dbfs_mean = float(np.nanmean(df["rms_dbfs"])) if df["rms_dbfs"].notna().any() else None
pitch_conf_mean = 1.0
# ==== очистка NaN -> None ====
df = df.replace({np.nan: None})
# ==== JSON ====
transcript = " ".join(df["word"].tolist()).replace(" ,", ",").replace(" .", ".")
payload = {
"version": "1.2",
"aggregates": {
"duration_sec": round(duration,2),
"voiced_duration_sec": round(float((~np.isnan(f0)).sum()) * (hop_len / sr), 2),
"voiced_ratio": float(np.clip(((~np.isnan(f0)).sum() * hop_len / sr) / duration if duration>0 else 0.0, 0, 1)),
"rms_dbfs_mean": rms_dbfs_mean,
"f0_mean_hz": f0_mean,
"f0_median_hz": f0_med,
"f0_std_hz": f0_std,
"f0_stability": f0_stability,
"pitch_confidence_mean": pitch_conf_mean,
"jitter_voice": jitter_voice,
"shimmer_voice": shimmer_voice,
"words_count": words_count,
"speech_rate_wps": round(speech_rate_wps,2),
"vad": method_used
},
"words": df.to_dict(orient="records"),
"transcript": transcript
}
out_json.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
info = {
"duration_sec": round(duration,2),
"segments": len(segments),
"words": words_count,
"vad": method_used,
"elapsed_sec": round(time.time()-t0,2)
}
return str(out_json), info