Spaces:
Running
Running
No-fiddle: zaszyty łańcuch agy (mono+HPF75+LUFS-18+peak-1dB) w postproc.py; front=kroki+normalizacja; piny scipy/pyloudnorm
a6791a4 verified | """postproc.py — czyszczenie + normalizacja próbek głosu PO AudioSR. | |
| Czysty DSP: numpy / scipy / soundfile / pyloudnorm. BEZ gradio → moduł da się | |
| testować lokalnie (jak viz.py). app.py woła JEDNO wejście: process(...). | |
| Kontekst: próbki służą jako REFERENCJA do klonowania głosu (RVC/Applio + zero-shot | |
| TTS OmniVoice). Kolejność całego pipeline'u (potwierdzona przez red-team Gemini 3.1 Pro): | |
| (opcjonalny zewnętrzny UVR5 de-reverb) → AudioSR (super-rozdzielczość) → | |
| czyszczenie w stylu Audacity → normalizacja. | |
| AudioSR MUSI poprzedzać normalizację (normalizacja przed super-res wstrzykuje | |
| artefakty w wierzchołki). Ten moduł obsługuje DWA ostatnie etapy: czyszczenie + | |
| DWUSTOPNIOWĄ normalizację (LUFS → sufit szczytu), na sygnale już po AudioSR. | |
| ──────────────────────────────────────────────────────────────────────────── | |
| CO ROBIMY, gdy normalizacja WŁĄCZONA (kolejność istotna): | |
| 1. force-mono — embeddery (hubert / contentvec / spin-v2) oczekują mono; | |
| stereo grozi kasowaniem fazy (phase cancellation). | |
| 2. usunięcie DC — odjęcie składowej stałej (średniej) przed filtrem. | |
| 3. high-pass ~75 Hz — Butterworth rz. 2, ZERO-PHASE (filtfilt) → usuwa rumble/ | |
| szum sieciowy/resztki DC, NIE rusza barwy głosu. filtfilt | |
| (dwukierunkowo) nie przesuwa fazy → brak rozmycia transjentów. | |
| 4. (opcjonalnie) łagodny WARUNKOWY de-esser — patrz DEESS_ENABLED niżej. | |
| 5. LUFS normalize — BS.1770-4 (pyloudnorm Meter, K-weighting) do stałego celu | |
| (TARGET_LUFS = -18). Podnosi PERCEPCYJNĄ głośność/gęstość, | |
| żeby model uczył się barwy także z cichszych fragmentów. | |
| 6. sufit szczytu -1 dB — TYLKO redukcja wzmocnienia (gain-down), gdy szczyt > -1 dB. | |
| ŻADNEJ kompresji/limitera/clipingu — czysta zmiana skali. | |
| Gdy normalizacja WYŁĄCZONA: zachowanie jak dotychczas — dopasowanie poziomu RMS | |
| wyjścia do oryginału + bezpieczny współczynnik 0.99 (bez high-passu / LUFS). | |
| ──────────────────────────────────────────────────────────────────────────── | |
| ŚWIADOMIE POMINIĘTE (każde z UZASADNIENIEM — patrz komentarze przy stałych): | |
| • ślepa spektralna redukcja szumu — RED-TEAM: #1 zabójca barwy; nie da się | |
| wiarygodnie auto-profilować szumu w ciemno. | |
| • ślepe wycięcie rezonansu EQ — szkodliwe na czystym, już-dobrym audio. | |
| • UVR5 de-reverb — narzędzie ZEWNĘTRZNE, nie wbudowane w apkę. | |
| • ręczne usuwanie klików/plozji — nieautomatyzowalne bezpiecznie (wymaga ucha). | |
| • kompresja dynamiki / limiter — spłaszcza dynamikę → monotonny TTS. | |
| • noise gate — zabija transjenty końców wyrazów. | |
| • boost EQ — podbija sybilanty (syki). | |
| Źródłem prawdy o jakości jest jakość NAGRANIA (cichy pokój) > każdy post-processing. | |
| """ | |
| import numpy as np | |
| import soundfile as sf | |
| # scipy i pyloudnorm to zależności ścieżki ON (high-pass / LUFS). Import MIĘKKI: | |
| # gdyby koło scipy zostało zbudowane pod inne ABI niż przypięty numpy==1.23.5 | |
| # (resolver drift), twardy import na poziomie modułu wywaliłby CAŁY Space na starcie | |
| # (app.py importuje postproc BEZ osłony). Bez scipy degradujemy do „DC-only" (bez HPF), | |
| # bez pyloudnorm do RMS-fallbacku LUFS — apka działa dalej, nie pada. | |
| try: | |
| from scipy.signal import butter, filtfilt, sosfiltfilt | |
| _HAVE_SCIPY = True | |
| except Exception: # pragma: no cover - środowisko bez scipy | |
| butter = filtfilt = sosfiltfilt = None | |
| _HAVE_SCIPY = False | |
| try: | |
| import pyloudnorm as pyln | |
| _HAVE_PYLN = True | |
| except Exception: # pragma: no cover - środowisko bez pyloudnorm | |
| pyln = None | |
| _HAVE_PYLN = False | |
| # ── Parametry (HARDCODE — user nie ma nic kręcić) ──────────────────────────── | |
| HPF_HZ = 75.0 # high-pass ~75 Hz: między 60–100 Hz wg agy; zdejmuje rumble, | |
| # nie wchodzi w pasmo głosu (F0 mowy ≳ 85 Hz nawet u basów). | |
| HPF_ORDER = 2 # Butterworth rz. 2 (~12 dB/oct). filtfilt podwaja rząd | |
| # efektywnie do ~4 → łagodny rolloff bez dzwonienia. | |
| TARGET_LUFS = -18.0 # cel integrated loudness. RVC lubi -18..-20, referencja | |
| # -19..-23; -18 to bezpieczny, gęsty wspólny punkt. | |
| PEAK_CEIL_DB = -1.0 # sufit szczytu (true-ish peak na próbkach): -1 dB zapas. | |
| # WARUNKOWY de-esser — DOMYŚLNIE WYŁĄCZONY. | |
| # DECYZJA PANELU: w trybie no-fiddle, w pełni automatycznym, de-essing w ciemno | |
| # jest ryzykowny — zbyt agresywny matuje spółgłoski (sz/cz/s), które AudioSR | |
| # DOPIERO ODBUDOWAŁ (to cały sens super-rozdzielczości tu). agy dopuszcza go tylko | |
| # „jeśli »s« jest ostre", a tego nie da się wiarygodnie ocenić bez ucha. Zostawiamy | |
| # kod (zachowawczy: tnie 6–8 kHz o ≤3 dB TYLKO gdy pasmo jest GORĄCE względem | |
| # reszty widma), ale wpięty za flagą — domyślnie OFF, by NIE dotykać barwy. | |
| DEESS_ENABLED = False | |
| DEESS_LO_HZ = 6000.0 | |
| DEESS_HI_HZ = 8000.0 | |
| DEESS_MAX_CUT_DB = 3.0 # nigdy nie tnij więcej niż 3 dB | |
| DEESS_HOT_MARGIN_DB = 6.0 # „gorące" = pasmo s o ≥6 dB nad medianą reszty widma | |
| _EPS = 1e-12 | |
| # ── Narzędzia podstawowe ───────────────────────────────────────────────────── | |
| def sanitize(wav: np.ndarray) -> np.ndarray: | |
| """Twardy strażnik NaN/Inf → 0 oraz gwarancja float32 contiguous 1-D-friendly. | |
| Wołane na WEJŚCIU process() i po KAŻDYM etapie DSP. Bez tego pojedynczy NaN/Inf | |
| w wyniku AudioSR (zdarza się przy dyfuzji na CPU) zatruwa np.mean (DC), filtfilt | |
| rozsmarowuje go po całym sygnale, a sf.write zapisuje plik z NaN-ami (cisza/trzask). | |
| nan_to_num zeruje też ±inf, więc dalsze obliczenia wzmocnienia są skończone.""" | |
| wav = np.asarray(wav, dtype=np.float32) | |
| if not np.all(np.isfinite(wav)): | |
| wav = np.nan_to_num(wav, nan=0.0, posinf=0.0, neginf=0.0).astype(np.float32) | |
| return wav | |
| def to_mono(wav: np.ndarray) -> np.ndarray: | |
| """Wymuś mono float32 1-D. (N,C) lub (C,N) → uśrednienie kanałów. | |
| Embeddery RVC oczekują mono; miks zamiast wyboru kanału unika utraty energii, | |
| a praca na sygnale już zsumowanym wyklucza kasowanie fazy w dalszych krokach. | |
| NaN/Inf czyszczone PRZED uśrednieniem — inaczej jeden zły kanał psuje mono.""" | |
| wav = sanitize(wav) | |
| if wav.ndim == 1: | |
| return wav | |
| # heurystyka osi kanałów: kanałów jest mało (≤8), próbek dużo. | |
| if wav.ndim == 2: | |
| if wav.shape[0] <= 8 and wav.shape[0] < wav.shape[1]: | |
| wav = wav.T # (C,N) → (N,C) | |
| return sanitize(wav.mean(axis=1)) | |
| return sanitize(wav.reshape(-1)) | |
| def remove_dc(wav: np.ndarray) -> np.ndarray: | |
| """Usuń składową stałą (DC offset) — odjęcie średniej.""" | |
| wav = np.asarray(wav, dtype=np.float32) | |
| return (wav - float(np.mean(wav))).astype(np.float32) | |
| def highpass(wav: np.ndarray, sr: int, cutoff_hz: float = HPF_HZ, | |
| order: int = HPF_ORDER) -> np.ndarray: | |
| """Zero-phase Butterworth high-pass (filtfilt). | |
| filtfilt filtruje w przód i w tył → ZEROWE przesunięcie fazy (brak rozmycia | |
| transjentów spółgłosek) kosztem podwojenia efektywnego rzędu. Współczynniki | |
| liczone w SOS (drugie sekcje) → stabilne numerycznie przy fs=48000.""" | |
| if not _HAVE_SCIPY: | |
| return sanitize(wav) # bez scipy: pomiń HPF (degradacja, NIE crash) | |
| wav = sanitize(wav).astype(np.float64) # filtfilt liczy w float64 dla stabilności | |
| nyq = 0.5 * sr | |
| wn = cutoff_hz / nyq | |
| if not (0.0 < wn < 1.0): | |
| return wav.astype(np.float32) | |
| # filtfilt wymaga sygnału dłuższego niż „padlen". Dla SOS scipy domyślnie bierze | |
| # padlen = 3*(2*n_sekcji+1); dla krótkich klipów dobierz mniejszy, a w ostateczności | |
| # pomiń filtr, żeby nie wywalić przetwarzania na ułamkowo-sekundowej próbce. | |
| sos = butter(order, wn, btype="highpass", output="sos") | |
| default_padlen = 3 * (2 * sos.shape[0] + 1) # ZGODNY z domyślnym padlen scipy dla SOS | |
| if wav.shape[0] <= default_padlen: | |
| padlen = max(0, wav.shape[0] - 1) | |
| if padlen < 6: # zbyt krótko, by sensownie filtrować | |
| return wav.astype(np.float32) | |
| try: | |
| return sanitize(sosfiltfilt(sos, wav, padlen=padlen)) | |
| except Exception: | |
| return wav.astype(np.float32) | |
| return sanitize(sosfiltfilt(sos, wav)) | |
| # ── WARUNKOWY de-esser (domyślnie OFF — patrz DEESS_ENABLED) ───────────────── | |
| def _band_db_above_rest(wav: np.ndarray, sr: int, lo: float, hi: float) -> float: | |
| """O ile dB pasmo [lo,hi] góruje nad MEDIANĄ reszty widma (miara „gorącości").""" | |
| n = 1 << int(np.ceil(np.log2(max(len(wav), 2)))) | |
| n = min(n, 1 << 16) | |
| seg = wav[:n] if len(wav) >= n else np.pad(wav, (0, n - len(wav))) | |
| mag = np.abs(np.fft.rfft(seg * np.hanning(len(seg)))) | |
| f = np.fft.rfftfreq(len(seg), 1.0 / sr) | |
| band = (f >= lo) & (f <= hi) | |
| rest = (f > 200.0) & ~band | |
| if not band.any() or not rest.any(): | |
| return 0.0 | |
| band_e = float(np.sqrt(np.mean(mag[band] ** 2)) + _EPS) | |
| rest_e = float(np.median(mag[rest]) + _EPS) | |
| return 20.0 * np.log10(band_e / rest_e) | |
| def conditional_deesser(wav: np.ndarray, sr: int) -> np.ndarray: | |
| """Tnij 6–8 kHz o ≤3 dB TYLKO gdy pasmo jest wyraźnie gorące. Zachowawcze.""" | |
| if not DEESS_ENABLED or not _HAVE_SCIPY: | |
| return wav | |
| hot = _band_db_above_rest(wav, sr, DEESS_LO_HZ, DEESS_HI_HZ) | |
| if hot < DEESS_HOT_MARGIN_DB: | |
| return wav # nie gorące → nie ruszaj (nie psuj odbudowanej góry) | |
| # ile ciąć: proporcjonalnie do przekroczenia, ale max DEESS_MAX_CUT_DB | |
| cut_db = min(DEESS_MAX_CUT_DB, (hot - DEESS_HOT_MARGIN_DB) * 0.5 + 1.0) | |
| nyq = 0.5 * sr | |
| lo, hi = DEESS_LO_HZ / nyq, min(DEESS_HI_HZ, nyq * 0.99) / nyq | |
| if not (0.0 < lo < hi < 1.0): | |
| return wav | |
| b, a = butter(2, [lo, hi], btype="bandstop") # wycina pasmo | |
| # bandstop usuwa pasmo całkowicie; by uzyskać DELIKATNY cut o cut_db, mieszamy | |
| # sygnał oryginalny z przefiltrowanym (notch) proporcjonalnie. | |
| notched = filtfilt(b, a, wav.astype(np.float64)) | |
| g = 10.0 ** (-cut_db / 20.0) # docelowa amplituda pasma s | |
| mix = g * wav + (1.0 - g) * notched | |
| return mix.astype(np.float32) | |
| # ── Pomiar głośności (LUFS) z odpornym fallbackiem ─────────────────────────── | |
| def measure_lufs(wav: np.ndarray, sr: int): | |
| """Integrated loudness wg BS.1770-4 (pyloudnorm). Zwraca (lufs, method). | |
| Odporność: | |
| • pyloudnorm.Meter wymaga sygnału dłuższego niż block_size (domyślnie 400 ms) | |
| — dla krótszych klipów próbujemy zmniejszać block_size (do ~50 ms). | |
| • cisza/near-silence → integrated_loudness = -inf → traktujemy jak brak | |
| sygnału i sygnalizujemy fallbackiem (gain nie zostanie policzony z -inf). | |
| • brak pyloudnorm w środowisku → RMS-fallback. | |
| """ | |
| wav = np.asarray(wav, dtype=np.float64) | |
| if _HAVE_PYLN and wav.size: | |
| dur = wav.shape[0] / float(sr) | |
| # dobierz block_size: domyślnie 0.4 s; dla krótkich klipów zmniejsz tak, | |
| # by zmieścić co najmniej jeden pełny blok. | |
| for block in (0.4, 0.2, 0.1, 0.05): | |
| if dur <= block: | |
| continue | |
| try: | |
| meter = pyln.Meter(sr, block_size=block) | |
| lufs = float(meter.integrated_loudness(wav)) | |
| if np.isfinite(lufs): | |
| return lufs, ("bs1770" if block == 0.4 else f"bs1770@{block:g}s") | |
| except Exception: | |
| continue | |
| # ── Fallback RMS → przybliżony LUFS ────────────────────────────────────── | |
| # Dla mono, bez K-weightingu, BS.1770 sprowadza się do: | |
| # loudness ≈ -0.691 + 10*log10(mean(x^2)) (offset bramki absolutnej -70 LKFS | |
| # pomijamy — dla pojedynczej próbki mowy nie ma ciszy do zbramkowania). | |
| ms = float(np.mean(wav ** 2)) | |
| if ms <= _EPS: | |
| return float("-inf"), "silence" | |
| lufs = -0.691 + 10.0 * np.log10(ms) | |
| return lufs, "rms-fallback" | |
| def lufs_normalize(wav: np.ndarray, sr: int, target_lufs: float = TARGET_LUFS): | |
| """Skaluj sygnał do docelowego LUFS (stała zmiana wzmocnienia). (wav, info).""" | |
| wav = np.asarray(wav, dtype=np.float32) | |
| lufs, method = measure_lufs(wav, sr) | |
| if not np.isfinite(lufs): | |
| # cisza / brak sygnału → nie ruszaj (uniknij gain = +inf) | |
| return wav, {"in_lufs": lufs, "out_lufs": lufs, "gain_db": 0.0, "method": method} | |
| gain_db = target_lufs - lufs | |
| gain = 10.0 ** (gain_db / 20.0) | |
| out = (wav * gain).astype(np.float32) | |
| return out, {"in_lufs": lufs, "out_lufs": target_lufs, "gain_db": gain_db, "method": method} | |
| # ── Sufit szczytu: WYŁĄCZNIE redukcja wzmocnienia (bez limitera) ───────────── | |
| def peak_ceiling(wav: np.ndarray, ceil_db: float = PEAK_CEIL_DB): | |
| """Zejdź wzmocnieniem tak, by szczyt = ceil_db, ALE TYLKO gdy szczyt jest WYŻEJ. | |
| To czysta zmiana skali całego sygnału (jeden mnożnik), nigdy nie zniekształca | |
| kształtu fali → ZERO kompresji/limitera/clipingu (te myliłyby embeddery). | |
| Jeśli szczyt jest już poniżej sufitu — NIE podbijamy (LUFS już ustawił poziom).""" | |
| wav = np.asarray(wav, dtype=np.float32) | |
| peak = float(np.max(np.abs(wav))) if wav.size else 0.0 | |
| if peak <= _EPS: | |
| return wav, {"peak_db_in": float("-inf"), "peak_db_out": float("-inf"), "gain_db": 0.0} | |
| ceil_lin = 10.0 ** (ceil_db / 20.0) | |
| peak_db_in = 20.0 * np.log10(peak) | |
| if peak <= ceil_lin: | |
| return wav, {"peak_db_in": peak_db_in, "peak_db_out": peak_db_in, "gain_db": 0.0} | |
| g = ceil_lin / peak | |
| out = (wav * g).astype(np.float32) | |
| return out, {"peak_db_in": peak_db_in, "peak_db_out": ceil_db, "gain_db": 20.0 * np.log10(g)} | |
| # ── Ścieżka OFF: dopasowanie RMS do oryginału (zachowanie dotychczasowe) ────── | |
| def rms_match(wav: np.ndarray, src_path: str, safety: float = 0.99): | |
| """Dopasuj RMS wyjścia do oryginału + bezpiecznik szczytu 0.99 (bez HPF/LUFS).""" | |
| wav = np.asarray(wav, dtype=np.float32) | |
| try: | |
| src, _ = sf.read(src_path, dtype="float32") | |
| src = to_mono(src) | |
| r_in = float(np.sqrt(np.mean(src ** 2))) + 1e-9 | |
| r_out = float(np.sqrt(np.mean(wav ** 2))) + 1e-9 | |
| wav = wav * (r_in / r_out) | |
| except Exception: | |
| pass # gdy nie da się odczytać źródła — zostaw poziom, zadziała tylko bezpiecznik | |
| pk = float(np.max(np.abs(wav))) if wav.size else 0.0 | |
| if pk > safety: | |
| wav = wav / pk * safety | |
| return wav.astype(np.float32) | |
| # ── JEDYNE wejście, które woła app.py ──────────────────────────────────────── | |
| def process(wav: np.ndarray, sr: int, src_path: str, normalize: bool): | |
| """Przetwórz sygnał PO AudioSR. Zwraca (wav_float32_mono, info: dict). | |
| normalize=True : mono → DC → high-pass 75 Hz → [de-ess warunkowo] → | |
| LUFS -18 → sufit szczytu -1 dB (gain-down only). | |
| normalize=False: mono → dopasowanie RMS do src + bezpiecznik 0.99. | |
| Zawsze zwraca mono (embeddery RVC). Próbkowanie zostaje sr (app pisze 48 kHz); | |
| RVC sam zresampluje do 32k/40k w preprocesie Applio — nie schodzimy w dół tu. | |
| """ | |
| wav = to_mono(wav) | |
| info = {"normalize": bool(normalize), "sr": int(sr), "mono": True} | |
| if not normalize: | |
| out = sanitize(rms_match(wav, src_path)) | |
| info["path"] = "off/rms-match" | |
| info["peak_db_out"] = (20.0 * np.log10(float(np.max(np.abs(out))) + _EPS) | |
| if out.size else float("-inf")) | |
| return out, info | |
| out = remove_dc(wav) | |
| out = highpass(out, sr) | |
| out = conditional_deesser(out, sr) # no-op gdy DEESS_ENABLED=False | |
| out, lu = lufs_normalize(out, sr) | |
| out, pk = peak_ceiling(out) | |
| out = sanitize(out) # ostateczny strażnik przed sf.write | |
| # PRAWDZIWY osiągnięty LUFS: sufit szczytu działa PO LUFS i przy pojedynczym | |
| # głośnym transjencie (plozja/klik) może zejść wzmocnieniem tyle, że cel -18 NIE | |
| # zostaje dotrzymany. Zmierz po fakcie i policz braki — app.py to UJAWNI userowi | |
| # (zamiast cicho udawać, że -18 wszedł). Patrz: dyscyplina „diagnose-silent-skip". | |
| out_lufs_actual, _m = measure_lufs(out, sr) | |
| lu["out_lufs_actual"] = out_lufs_actual | |
| info["path"] = "on/dc+hpf+lufs+peak" | |
| info["deess"] = DEESS_ENABLED | |
| info["lufs"] = lu | |
| info["peak"] = pk | |
| info["lufs_shortfall_db"] = (float(TARGET_LUFS - out_lufs_actual) | |
| if np.isfinite(out_lufs_actual) else 0.0) | |
| return out, info | |
| # ── Samotest (uruchom: python postproc.py) ─────────────────────────────────── | |
| if __name__ == "__main__": | |
| sr = 48000 | |
| rng = np.random.default_rng(42) | |
| t = np.arange(int(2.0 * sr)) / sr | |
| # ton 200 Hz + lekki szum + CELOWY DC offset, by udowodnić jego usunięcie | |
| tone = 0.20 * np.sin(2 * np.pi * 200 * t) | |
| noise = 0.01 * rng.standard_normal(t.shape).astype(np.float32) | |
| rumble = 0.05 * np.sin(2 * np.pi * 35 * t) # podziemny rumble < 75 Hz (HPF go zdejmie) | |
| dc = 0.03 | |
| x = (tone + noise + rumble + dc).astype(np.float32) | |
| print("=== WEJSCIE ===") | |
| li, mi = measure_lufs(x, sr) | |
| print(f"DC offset in : {np.mean(x):+.5f}") | |
| print(f"peak in : {20*np.log10(np.max(np.abs(x))+_EPS):+.2f} dBFS") | |
| print(f"LUFS in : {li:.2f} ({mi})") | |
| out, info = process(x, sr, src_path="<none>", normalize=True) | |
| lo, mo = measure_lufs(out, sr) | |
| print("\n=== WYJSCIE (normalize=ON) ===") | |
| print(f"DC offset out: {np.mean(out):+.5f} (ma byc ~0)") | |
| print(f"peak out : {20*np.log10(np.max(np.abs(out))+_EPS):+.2f} dBFS (cel <= {PEAK_CEIL_DB:+.1f})") | |
| print(f"LUFS out : {lo:.2f} (cel {TARGET_LUFS:+.1f}, {mo})") | |
| print(f"HPF gain @35Hz: rumble tłumiony — RMS in={np.sqrt(np.mean(x**2)):.4f} " | |
| f"out={np.sqrt(np.mean(out**2)):.4f}") | |
| print(f"info: {info}") | |
| # asercje poprawności DSP | |
| assert abs(float(np.mean(out))) < 1e-3, "DC nie usunięte" | |
| assert 20*np.log10(np.max(np.abs(out))+_EPS) <= PEAK_CEIL_DB + 0.1, "szczyt przekracza sufit" | |
| if np.isfinite(lo): | |
| assert abs(lo - TARGET_LUFS) < 1.5, f"LUFS poza celem: {lo}" | |
| print("\nOK: DC usunięte, szczyt pod sufitem, LUFS w celu.") | |
| # ── Matryca przypadków brzegowych: ŻADEN nie może wywalić procesu ani zwrócić | |
| # NaN/Inf/stereo. Każdy musi zwrócić mono float32, skończony, szczyt pod sufitem. | |
| import os | |
| import tempfile | |
| def synth(dur, level=0.2, dc=0.0, stereo=False, freq=220.0): | |
| tt = np.arange(int(sr * dur)) / sr | |
| s = 0.7 * np.sin(2 * np.pi * freq * tt) + 0.3 * rng.standard_normal(tt.size) | |
| s = (s / (np.max(np.abs(s)) + 1e-9) * level + dc).astype(np.float32) | |
| return np.stack([s, s * 0.9], axis=1) if stereo else s | |
| # plik źródłowy dla ścieżki OFF (rms_match czyta src_path) | |
| src_path = os.path.join(tempfile.gettempdir(), "_postproc_selftest_src.wav") | |
| sf.write(src_path, synth(2.0, 0.15), sr) | |
| bad = synth(1.0) | |
| bad[10], bad[20], bad[30] = np.nan, np.inf, -np.inf | |
| cases = [ | |
| ("stereo 2.0s", synth(2.0, 0.2, stereo=True), True, "<none>"), | |
| ("near-silent 1.0s", synth(1.0, 1e-7), True, "<none>"), | |
| ("cisza 1.0s", np.zeros(sr, np.float32), True, "<none>"), | |
| ("już-przesterowany x3", synth(1.0, 1.0) * 3.0, True, "<none>"), | |
| ("NaN/Inf 1.0s", bad, True, "<none>"), | |
| ("ultra-krótki 10ms", synth(0.01, 0.2), True, "<none>"), | |
| ("krótki 200ms", synth(0.2, 0.2), True, "<none>"), | |
| ("OFF-path 2.0s", synth(2.0, 0.2, dc=0.05), False, src_path), | |
| ("OFF przesterowany x3", synth(1.0, 1.0) * 3.0, False, src_path), | |
| ("OFF zły src 2.0s", synth(2.0, 0.2), False, "<brak>"), | |
| ] | |
| print("\n=== MATRYCA PRZYPADKÓW BRZEGOWYCH ===") | |
| all_ok = True | |
| for name, w, norm, sp in cases: | |
| out, inf = process(w, sr, src_path=sp, normalize=norm) | |
| finite = bool(np.all(np.isfinite(out))) | |
| mono = out.ndim == 1 | |
| f32 = out.dtype == np.float32 | |
| peak = float(np.max(np.abs(out))) if out.size else 0.0 | |
| ceil = 10.0 ** (PEAK_CEIL_DB / 20.0) if norm else 0.99 | |
| within = peak <= ceil + 1e-3 | |
| good = finite and mono and f32 and within | |
| all_ok = all_ok and good | |
| print(f" {'OK ' if good else '!! '}{name:22s} finite={finite} mono={mono} " | |
| f"f32={f32} peak={peak:.4f} within={within} " | |
| f"method={inf.get('lufs', {}).get('method', inf.get('path'))}") | |
| assert good, f"przypadek brzegowy zawiódł: {name}" | |
| print("\nWSZYSTKIE PRZYPADKI BRZEGOWE:", "PASS" if all_ok else "FAIL") | |